Spark(10) -- Spark实现点击流日志分析案例(scala版)

    科技2022-07-12  121

    项目结构:

    1. 访问的pv

    package com.erainm import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object PV { def main(args: Array[String]): Unit = { // 创建SparkCong并设置AppName和Master,创建SparkContext val sc: SparkContext = new SparkContext(new SparkConf().setAppName("PV").setMaster("local[2]")) // 读取数据 val dataRDD: RDD[String] = sc.textFile("/Users/erainm/Documents/application/dev/workSpace/spark_parent/spark_parent/data/access.log") // 将第一行数据输出,输出(P,V) val pvAndOne: RDD[(String, Int)] = dataRDD.map(x => ("PV", 1)) // 聚合输出 val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_ + _) totalPV.foreach(println) sc.stop() } }

    2. 访问的uv

    package com.erainm import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object UV { def main(args: Array[String]): Unit = { // 创建SparkCong并设置AppName和Master,创建SparkContext val sc: SparkContext = new SparkContext(new SparkConf().setAppName("UV").setMaster("local[2]")) // 读取数据 val dataRDD: RDD[String] = sc.textFile("/Users/erainm/Documents/application/dev/workSpace/spark_parent/spark_parent/data/access.log") // 对每一行分割,获取IP地址 val ipsRDD: RDD[(String)] = dataRDD.map(_.split(" ")).map(x => x(0)) // 对IP地址进行去重,最后输出格式("UV",1) val distinctUV: RDD[(String, Int)] = ipsRDD.distinct().map(x => ("UV", 1)) // 聚合输出 val totalUVRDD: RDD[(String, Int)] = distinctUV.reduceByKey(_ + _) totalUVRDD.foreach(println(_)) // 数据保存到本地 totalUVRDD.saveAsTextFile("/Users/erainm/Documents/application/dev/workSpace/spark_parent/spark_parent/data/out") sc.stop() } }

    3. 访问的topN

    package com.erainm import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object TopN { def main(args: Array[String]): Unit = { // 创建SparkCong并设置AppName和Master,创建SparkContext val sc: SparkContext = new SparkContext(new SparkConf().setAppName("PV").setMaster("local[2]")) sc.setLogLevel("WARN") // 读取数据 val dataRDD: RDD[String] = sc.textFile("/Users/erainm/Documents/application/dev/workSpace/spark_parent/spark_parent/data/access.log") // 将第一行数据作为输出,输出(来源URL,1) val refUrlAndOne: RDD[(String, Int)] = dataRDD.map(_.split(" ")).filter(_.length > 10).map(x => (x(10), 1)) // 聚合 排序--> 降序 val resultRDD: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_ + _).sortBy(_._2, false).filter(x => x._1 != "\"-\"") // 通过take去topN,这里获取前5 val finalResult: Array[(String, Int)] = resultRDD.take(5) println(finalResult.toBuffer) sc.stop() } }

    Processed: 0.011, SQL: 8