1 Spark Core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkCoreWordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val inputRDD: RDD[String] = sc.textFile("data/file/wordcount.txt", 2)
val resultRDD: RDD[(String, Int)] = inputRDD.filter(line => null != line && line.trim.length > 0)
.flatMap(_.split("\\s+"))
.mapPartitions(iter => iter.map(_ -> 1))
.reduceByKey(_ + _)
resultRDD.coalesce(1)
.foreachPartition(
iter=>iter.foreach(println)
)
sc.stop()
}
}
2 Spark SQL
2.1 DSL
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object SparkDSLWordCount {
def main(args: Array[String]): Unit = {
// TODO: 1、构建SparkSession实例对象,通过建造者模式创建
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
// TODO: 2、读取HDFS上文本文件数据
val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount/wordcount.data")
// TODO: 3、使用DSL(Dataset API),类似RDD API
val resultDF: DataFrame = inputDS
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 将每行数据进行分割
.flatMap(line => line.split("\\s+"))
// 按照单词分组统计:SELECT word, count(1) FROM tb_words GROUP BY word
.groupBy("value")
// 使用count函数,获取值类型Long类型 -> 数据库中就是BigInt类型
.count()
resultDF.show(10)
// TODO: 关闭资源
spark.stop()
}
}
2.2 SQL
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object SparkSQLWordCount {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount/wordcount.data")
val wordsDS: Dataset[String] = inputDS
.filter(line => null != line && line.trim.length > 0)
.flatMap(line => line.trim.split("\\s+"))
wordsDS.printSchema()
wordsDS.show(20)
wordsDS.createOrReplaceTempView("view_tmp_words")
val resultDF: DataFrame = spark.sql(
"""
|SELECT value, COUNT(1) AS cnt FROM view_tmp_words GROUP BY value ORDER BY cnt DESC
""".stripMargin)
resultDF.show(10)
spark.stop()
}
}
3 SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = {
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
val context = new StreamingContext(sparkConf, Seconds(5))
context
}
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
"node1.itcast.cn", 9999
)
val resultDStream: DStream[(String, Int)] = inputDStream
.filter(line => null != line && line.trim.length > 0)
.flatMap(line => line.trim.split("\\s+"))
.map(word => (word, 1))
.reduceByKey((tmp, item) => tmp + item)
resultDStream.print(10)
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
4 StructuredStreaming
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
object StructuredWordCount {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val inputSteamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
val resultStreamDF: DataFrame = inputSteamDF
.as[String]
.filter(line => null != line && line.trim.length > 0)
.flatMap(_.trim.split("\\s+"))
.groupBy($"value").count()
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
query.awaitTermination()
query.stop()
}
}
附录一 pom.xml
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
<hbase.version>1.2.0-cdh5.16.2</hbase.version>
<kafka.version>2.0.0</kafka.version>
<mysql.version>8.0.19</mysql.version>
<jedis.version>3.2.0</jedis.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
转载请注明原文地址:https://blackberry.8miu.com/read-6192.html