Spark(11) -- Spark实现ip地址查询案例及Sogou日志分析(scala版)

    科技2022-07-12  130

    1. Spark实现ip地址查询案例

    1.1 需求分析

     在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。我们根据每个用户的 IP地址,与我们的IP地址段进行比较,确认每个IP落在哪一个IP端内,获取经纬度,然后绘制热力图  因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip 段,统计热点经纬度。

    1.2 技术调研

     因为我们的需求是完成一张报表信息,所以对程序的实时性没有要求,所以可以选择内存计 算spark来实现上述功能。

    1.3 架构设计

    搭建Spark集群

    1.4 开发流程

    1.4.1 数据准备

    在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍

    1.4.2 用户访问日志数据

    1.4.3 城市ip段信息-(IP经纬度的信息)

    1.5 分析实现思路

    1.5.1 分析

    使用访问日志数据和Ip网段数据判断当前的访问的IP处于什么经纬度对同在一个经纬度端或区间的数据进行统计

    下面的是经过排序的IP地址段,能够使用什么方法将用户IP匹配到IP段

    二分查找–条件 需要数据从小到达排序IP地址的Long类型的转化

    1.5.2 实现思路

    首先程序中加载用户数据,获取用户的IP再次获取IP的地址段的信息–获取IP的经纬度,IP的起始位置和结束位置需要将IP转换为Long类型的数据需要将转换后的数据通过二分法和已经排好序的起始和结束IP段进行二分查找对于处于同一个经纬度的用户使用reduceByKey的方法进行累加执行输出

    1.5.3 scala代码实现

    package com.erainm import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object IpAddressCheck { /** * 将IP地址转换为long类型的数据,便于后面二分查找 * @param ip * @return */ def ipToLong(ip: String): Long = { val split: Array[String] = ip.split("\\.") var returnNum: Long = 0 for (num <- split) { returnNum = num.toLong | returnNum << 8L } returnNum } /** * * @param ipTrue 用户IP * @param ipRulesArray IP的规则 * @return 返回的是用户的ip在IP规则的位置(ipRulesArry位置索引) */ def binarySearch(ipTrue: Long, ipRulesArray: Array[(String, String, String, String)]): Int = { // 1。 定义起始位置 var start = 0 // 2。 定义结束位置 var end = ipRulesArray.length-1 // 4。 循环比较当前元素IpTrue和ipRulesArray的起始IP和结束Ip的大小关系,最终返回的是位置参数 while (start < end){ // 3。 定义中间位置-------需要每次改变中间位置 var middle = (start+end)/2 if (ipTrue>=ipRulesArray(middle)._1.toLong && ipTrue<=ipRulesArray(middle)._2.toLong) { return middle } if (ipTrue<ipRulesArray(middle)._1.toLong) { end = middle - 1 } if (ipTrue>ipRulesArray(middle)._2.toLong){ start = middle + 1 } } 0 } def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setMaster("local[3]").setAppName("IpAddressCheck")) sc.setLogLevel("WARN") // 1. 首先程序加载用户数据,获取用户的IP val ipDataRDD: RDD[String] = sc.textFile("./spark_parent/data/20090121000132.394251.http.format") // 2. 再次获取IP的地址段的信息---获取IP的经纬度、IP的起始和结束位置 val ipRuleRDD: RDD[String] = sc.textFile("./spark_parent/data/ip.txt") val value: RDD[(String, String, String, String)] = ipRuleRDD.map(x => x.split("\\|")).map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1))) // value.collect().foreach(println(_)) // 这里针对ipRuleRDD使用广播变量 val ipRules: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(value.collect()) val ipValueRDD: RDD[String] = ipDataRDD.map(x => x.split("\\|")).map(x => x(1)) val partitionsRDD: RDD[((String, String), Int)] = ipValueRDD.mapPartitions(iter => { iter.map(ip => { // 具体逻辑 // 获取广播变量的ipRules val ipRulesArray: Array[(String, String, String, String)] = ipRules.value // 3. 将IP需要转换为Long类型的数据 val ipTrue: Long = ipToLong(ip) //println("ipTrue: " + ipTrue) // 4. 需要将转换后的数据通过二分法和已经排序的起始和结束IP段进行二分查找 val ipIndex: Int = binarySearch(ipTrue, ipRulesArray) //println("ipIndex: " + ipIndex) // 需要返回当前用户所在的经纬度 ((ipRulesArray(ipIndex)._3, ipRulesArray(ipIndex)._4), 1) }) }) partitionsRDD.collect().foreach(println(_)) // 5. 对于处于同一个经纬度的用户(或同一个基站的数据)使用reduceByKey的方法进行累加 val resultRDD: RDD[((String, String), Int)] = partitionsRDD.reduceByKey(_ + _) // 6. 执行输出 resultRDD.collect().foreach(println(_)) // 释放资源 sc.stop() } }

    1.5.4 运行结果

    附:上面使用到了广播变量(后续详细说,这里简单解释一下) 官网解释:(我这里直接谷歌自带翻译了)  假如说有1000个task。大量task的确都在并行运行。这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,通过网络传输到各个task中去,给task使用。总计有1G的数据,会通过网络传输。网络传输的开销很大,也许就会消耗掉你的spark作业运行的总时间的一部分。  map副本,传输到了各个task上之后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一下子就耗费掉1G的内存。对性能会有什么影响呢?不必要的内存的消耗和占用, 就导致了,你在进行RDD持久化到内存,也许就没法完全在内存中放下;就只能写入磁盘,最后导 致后续的操作在磁盘IO上消耗性能;  你的task在创建对象的时候,也许会发现堆内存放不下所有对象,也许就会导致频繁的垃圾回收器的回收,GC的时候,一定是会导致工作线程停止,也就是导致Spark暂停工作那么一点时间。频繁 GC的话,对Spark作业的运行的速度会有相当可观的影响。

    所以我们要避免这种情况的发生,则引入了广播变量。

     广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,就可以让变量产生的副本大大减少。

    2. Sogou日志分析案例

     使用搜狗实验室提供【用户查询日志(SogouQ)】数据,使用Spark框架,将数据封装到RDD中进行业务数据处理分析。数据网址:http://www.sogou.com/labs/resource/q.php

    1)、数据介绍:搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。2)、数据格式 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL 用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID 3)、数据下载:分为三个数据集,大小不一样 迷你版(样例数据, 376KB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip精简版(1天数据,63MB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip完整版(1.9GB):http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip

    2.1 业务需求

    针对SougoQ用户查询日志数据中不同字段,不同业务进行统计分析: 使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数处理分析,灵活掌握Scala语言编程。

    2.2 准备工作

    在编程实现业务功能之前,首先考虑如何对【查询词】进行中文分词及将日志数据解析封装。

    2.2.1 HanLP 中文分词

     使用比较流行好用中文分词:HanLP,面向生产环境的自然语言处理工具包,HanLP 是由一系列模型与算法组成的 Java 工具包,目标是普及自然语言处理在生产环境中的应用。  官方网站:http://www.hanlp.com/,添加Maven依赖

    <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp --> <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.7</version> </dependency>

    演示范例:HanLP 入门案例,基本使用

    import java.util import com.hankcs.hanlp.HanLP import com.hankcs.hanlp.seg.common.Term import com.hankcs.hanlp.tokenizer.StandardTokenizer import scala.collection.JavaConverters._ /** * HanLP 入门案例,基本使用 */ object HanLPTest { def main(args: Array[String]): Unit = { // 入门Demo val terms: util.List[Term] = HanLP.segment("杰克奥特曼全集视频") println(terms) println(terms.asScala.map(_.word.trim)) // 标准分词 val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重阳") println(terms1) println(terms1.asScala.map(_.word.replaceAll("\\s+", ""))) val words: Array[String] = """00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html""" .split("\\s+") words.foreach(println) println(words(2).replaceAll("\\[|\\]", "")) } }

    2.2.2 样例类 SogouRecord

    将每行日志数据封装到CaseClass样例类SogouRecord中,方便后续处理:

    /** * 用户搜索点击网页记录Record * @param queryTime 访问时间,格式为:HH:mm:ss * @param userId 用户ID * @param queryWords 查询词 * @param resultRank 该URL在返回结果中的排名 * @param clickRank 用户点击的顺序号 * @param clickUrl 用户点击的URL */ case class SogouRecord( queryTime: String, // userId: String, // queryWords: String, // resultRank: Int, // clickRank: Int, // clickUrl: String // )

    2.3 业务实现

    先读取数据,封装到SougoRecord类中,再按照业务处理数据。

    2.3.1 读取数据

    构建SparkContext实例对象,读取本次SogouQ.sample数据,封装到SougoRecord中 。

    // TODO: 1. 本地读取SogouQ用户查询日志数据 //val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.sample") val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.reduced") //println(s"Count = ${rawLogsRDD.count()}") // TODO: 2. 解析数据,封装到CaseClass样例类中 val recordsRDD: RDD[SogouRecord] = rawLogsRDD // 过滤不合法数据,如null,分割后长度不等于6 .filter(log => null != log && log.trim.split("\\s+").length == 6) // 对每个分区中数据进行解析,封装到SogouRecord .mapPartitions{iter => iter.map{log => val arr: Array[String] = log.trim.split("\\s+") SogouRecord( arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), // arr(3).toInt, arr(4).toInt, arr(5) // ) } } println(s"Count = ${recordsRDD.count()}, First = ${recordsRDD.first()}")

    2.3.2 搜索关键词统计

    获取用户【查询词】,使用HanLP进行分词,按照单词分组聚合统计出现次数,类似WordCount程序,具体代码如下:

    // =================== 3.1 搜索关键词统计 =================== // a. 获取搜索词,进行中文分词 val wordsRDD: RDD[String] = recordsRDD.mapPartitions{iter => iter.flatMap{record => // 使用HanLP中文分词库进行分词 val terms: util.List[Term] = HanLP.segment(record.queryWords.trim) // 将Java中集合对转换为Scala中集合对象 import scala.collection.JavaConverters._ terms.asScala.map(term => term.word) } } //println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}") // b. 统计搜索词出现次数,获取次数最多Top10 val top10SearchWords: Array[(Int, String)] = wordsRDD .map(word => (word, 1)) // 每个单词出现一次 .reduceByKey((tmp, item) => tmp + item) // 分组统计次数 .map(tuple => tuple.swap) .sortByKey(ascending = false) // 词频降序排序 .take(10) // 获取前10个搜索词 top10SearchWords.foreach(println)

    运行结果如下,仅仅显示搜索最多关键词,其中需要过滤谓词:

    2.3.3 用户搜索点击统计

    统计出每个用户每个搜索词点击网页的次数,可以作为搜索引擎搜索效果评价指标。先按照用户ID分组,再按照【查询词】分组,最后统计次数,求取最大次数、最小次数及平均次数。

    // =================== 3.2 用户搜索点击次数统计 =================== /* 每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度 先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数 */ val clickCountRDD: RDD[((String, String), Int)] = recordsRDD .map{record => // 获取用户ID和搜索词 val key = record.userId -> record.queryWords (key, 1) } // 按照用户ID和搜索词组合的Key分组聚合 .reduceByKey((tmp, item) => tmp + item) clickCountRDD .sortBy(tuple => tuple._2, ascending = false) .take(10).foreach(println) println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}") println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}") println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")

    程序运行结果如下:

    2.3.4 搜索时间段统计

    按照【访问时间】字段获取【小时】,分组统计各个小时段用户查询搜索的数量,进一步观察用户喜欢在哪些时间段上网,使用搜狗引擎搜索,代码如下:

    // =================== 3.3 搜索时间段统计 =================== /* 从搜索时间字段获取小时,统计个小时搜索次数 */ val hourSearchRDD: RDD[(String, Int)] = recordsRDD // 提取小时 .map{record => // 03:12:50 record.queryTime.substring(0, 2) } // 分组聚合 .map(word => (word, 1)) // 每个单词出现一次 .reduceByKey((tmp, item) => tmp + item) // 分组统计次数 .sortBy(tuple => tuple._2, ascending = false) hourSearchRDD.foreach(println)

    程序运行结果如下:

    2.3.5 完整代码

    业务实现完整代码SogouQueryAnalysis如下所示: import java.util import com.hankcs.hanlp.HanLP import com.hankcs.hanlp.seg.common.Term import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * 用户查询日志(SogouQ)分析,数据来源Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。 * 1. 搜索关键词统计,使用HanLP中文分词 * 2. 用户搜索次数统计 * 3. 搜索时间段统计 * 数据格式: * 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL * 其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID */ object SogouQueryAnalysis { def main(args: Array[String]): Unit = { // 构建SparkContext上下文实例对象 val sc: SparkContext = { // a. 创建SparkConf对象,设置应用配置信息 val sparkConf = new SparkConf() .setMaster("local[2]") .setAppName(this.getClass.getSimpleName.stripSuffix("$")) // b. 创建SparkContext, 有就获取,没有就创建,建议使用 val context = SparkContext.getOrCreate(sparkConf) // c. 返回对象 context } sc.setLogLevel("WARN") // TODO: 1. 本地读取SogouQ用户查询日志数据 //val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.sample") val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.reduced") //println(s"Count = ${rawLogsRDD.count()}") // TODO: 2. 解析数据,封装到CaseClass样例类中 val recordsRDD: RDD[SogouRecord] = rawLogsRDD // 过滤不合法数据,如null,分割后长度不等于6 .filter(log => null != log && log.trim.split("\\s+").length == 6) // 对每个分区中数据进行解析,封装到SogouRecord .mapPartitions{iter => iter.map{log => val arr: Array[String] = log.trim.split("\\s+") SogouRecord( arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), // arr(3).toInt, arr(4).toInt, arr(5) // ) } } println(s"Count = ${recordsRDD.count()}, First = ${recordsRDD.first()}") // 数据使用多次,进行缓存操作,使用count触发 recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count() // TODO: 3. 依据需求统计分析 /* 1. 搜索关键词统计,使用HanLP中文分词 2. 用户搜索次数统计 3. 搜索时间段统计 */ // =================== 3.1 搜索关键词统计 =================== // a. 获取搜索词,进行中文分词 val wordsRDD: RDD[String] = recordsRDD.mapPartitions{iter => iter.flatMap{record => // 使用HanLP中文分词库进行分词 val terms: util.List[Term] = HanLP.segment(record.queryWords.trim) // 将Java中集合对转换为Scala中集合对象 import scala.collection.JavaConverters._ terms.asScala.map(term => term.word) } } //println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}") // b. 统计搜索词出现次数,获取次数最多Top10 val top10SearchWords: Array[(Int, String)] = wordsRDD .map(word => (word, 1)) // 每个单词出现一次 .reduceByKey((tmp, item) => tmp + item) // 分组统计次数 .map(tuple => tuple.swap) .sortByKey(ascending = false) // 词频降序排序 .take(10) // 获取前10个搜索词 top10SearchWords.foreach(println) // =================== 3.2 用户搜索点击次数统计 =================== /* 每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度 先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数 */ val clickCountRDD: RDD[((String, String), Int)] = recordsRDD .map{record => // 获取用户ID和搜索词 val key = record.userId -> record.queryWords (key, 1) } // 按照用户ID和搜索词组合的Key分组聚合 .reduceByKey((tmp, item) => tmp + item) clickCountRDD .sortBy(tuple => tuple._2, ascending = false) .take(10).foreach(println) println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}") println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}") println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}") // =================== 3.3 搜索时间段统计 =================== /* 从搜索时间字段获取小时,统计个小时搜索次数 */ val hourSearchRDD: RDD[(String, Int)] = recordsRDD // 提取小时 .map{record => // 03:12:50 record.queryTime.substring(0, 2) } // 分组聚合 .map(word => (word, 1)) // 每个单词出现一次 .reduceByKey((tmp, item) => tmp + item) // 分组统计次数 .sortBy(tuple => tuple._2, ascending = false) hourSearchRDD.foreach(println) // 释放缓存数据 recordsRDD.unpersist() // 应用结束,关闭资源 sc.stop() } }
    Processed: 0.012, SQL: 8