spark中几种WordCount的写法

    科技2022-07-13  115

    1 Spark Core

    import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkCoreWordCount { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[2]") .setAppName(this.getClass.getSimpleName.stripSuffix("$")) val sc = new SparkContext(conf) sc.setLogLevel("WARN") val inputRDD: RDD[String] = sc.textFile("data/file/wordcount.txt", 2) val resultRDD: RDD[(String, Int)] = inputRDD.filter(line => null != line && line.trim.length > 0) .flatMap(_.split("\\s+")) .mapPartitions(iter => iter.map(_ -> 1)) .reduceByKey(_ + _) resultRDD.coalesce(1) .foreachPartition( iter=>iter.foreach(println) ) sc.stop() } }

    2 Spark SQL

    2.1 DSL

    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object SparkDSLWordCount { def main(args: Array[String]): Unit = { // TODO: 1、构建SparkSession实例对象,通过建造者模式创建 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .getOrCreate() import spark.implicits._ // TODO: 2、读取HDFS上文本文件数据 val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount/wordcount.data") // TODO: 3、使用DSL(Dataset API),类似RDD API val resultDF: DataFrame = inputDS // 过滤不合格的数据 .filter(line => null != line && line.trim.length > 0) // 将每行数据进行分割 .flatMap(line => line.split("\\s+")) // 按照单词分组统计:SELECT word, count(1) FROM tb_words GROUP BY word .groupBy("value") // 使用count函数,获取值类型Long类型 -> 数据库中就是BigInt类型 .count() resultDF.show(10) // TODO: 关闭资源 spark.stop() } }

    2.2 SQL

    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object SparkSQLWordCount { def main(args: Array[String]): Unit = { // TODO: 1、构建SparkSession实例对象,通过建造者模式创建 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .getOrCreate() import spark.implicits._ // TODO: 2、读取HDFS上文本文件数据 val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount/wordcount.data") // TODO: 3、使用DSL(Dataset API),类似RDD API val wordsDS: Dataset[String] = inputDS // 过滤不合格的数据 .filter(line => null != line && line.trim.length > 0) // 将每行数据分割单词 .flatMap(line => line.trim.split("\\s+")) wordsDS.printSchema() wordsDS.show(20) // select value, count(1) as cnt from tb_words // TODO: 第一步,将Dataset注册为临时视图 wordsDS.createOrReplaceTempView("view_tmp_words") // TODO: 第二步,编写SQL执行分析 val resultDF: DataFrame = spark.sql( """ |SELECT value, COUNT(1) AS cnt FROM view_tmp_words GROUP BY value ORDER BY cnt DESC """.stripMargin) /* +---------+---+ | value|cnt| +---------+---+ | spark| 11| | hive| 6| |mapreduce| 4| | hadoop| 3| | sql| 2| | hdfs| 2| +---------+---+ */ resultDF.show(10) // TODO: 关闭资源 spark.stop() } }

    3 SparkStreaming

    import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingWordCount { def main(args: Array[String]): Unit = { // TODO: 1. 构建StreamingContext流式上下文实例对象 val ssc: StreamingContext = { // a. 创建SparkConf对象,设置应用配置信息 val sparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[3]") // b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch val context = new StreamingContext(sparkConf, Seconds(5)) // c. 返回 context } // TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据 /* def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] */ val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream( "node1.itcast.cn", 9999 ) // TODO: 3. 对每批次的数据进行词频统计 val resultDStream: DStream[(String, Int)] = inputDStream // 过滤不合格的数据 .filter(line => null != line && line.trim.length > 0) // 按照分隔符划分单词 .flatMap(line => line.trim.split("\\s+")) // 转换数据为二元组,表示每个单词出现一次 .map(word => (word, 1)) // 按照单词分组,聚合统计 .reduceByKey((tmp, item) => tmp + item) // TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出 resultDStream.print(10) // TODO: 5. 对于流式应用来说,需要启动应用 ssc.start() // 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止 ssc.awaitTermination() // 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭) ssc.stop(stopSparkContext = true, stopGracefully = true) } }

    4 StructuredStreaming

    import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} import org.apache.spark.sql.{DataFrame, SparkSession} object StructuredWordCount { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") .config("spark.sql.shuffle.partitions", "2").getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val inputSteamDF: DataFrame = spark.readStream .format("socket") .option("host", "node1") .option("port", 9999) .load() val resultStreamDF: DataFrame = inputSteamDF .as[String] .filter(line => null != line && line.trim.length > 0) .flatMap(_.trim.split("\\s+")) .groupBy($"value").count() val query: StreamingQuery = resultStreamDF.writeStream //.outputMode(OutputMode.Append()) .outputMode(OutputMode.Update()) .format("console") .option("numRows", "10") .option("truncate", "false") .start() query.awaitTermination()//当流式应用运行以后,正常情况一直运行 query.stop() } }

    附录一 pom.xml

    <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> <hbase.version>1.2.0-cdh5.16.2</hbase.version> <kafka.version>2.0.0</kafka.version> <mysql.version>8.0.19</mysql.version> <jedis.version>3.2.0</jedis.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark SQL 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Structured Streaming + Kafka 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase Client 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- Kafka Client 依赖 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> <!-- MySQL Client 依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- Jedis 依赖 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${jedis.version}</version> </dependency> <!-- JSON解析库:fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
    Processed: 0.012, SQL: 8