在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。我们根据每个用户的 IP地址,与我们的IP地址段进行比较,确认每个IP落在哪一个IP端内,获取经纬度,然后绘制热力图 因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip 段,统计热点经纬度。
因为我们的需求是完成一张报表信息,所以对程序的实时性没有要求,所以可以选择内存计 算spark来实现上述功能。
搭建Spark集群
在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍
下面的是经过排序的IP地址段,能够使用什么方法将用户IP匹配到IP段
二分查找–条件 需要数据从小到达排序IP地址的Long类型的转化附:上面使用到了广播变量(后续详细说,这里简单解释一下) 官网解释:(我这里直接谷歌自带翻译了) 假如说有1000个task。大量task的确都在并行运行。这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,通过网络传输到各个task中去,给task使用。总计有1G的数据,会通过网络传输。网络传输的开销很大,也许就会消耗掉你的spark作业运行的总时间的一部分。 map副本,传输到了各个task上之后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一下子就耗费掉1G的内存。对性能会有什么影响呢?不必要的内存的消耗和占用, 就导致了,你在进行RDD持久化到内存,也许就没法完全在内存中放下;就只能写入磁盘,最后导 致后续的操作在磁盘IO上消耗性能; 你的task在创建对象的时候,也许会发现堆内存放不下所有对象,也许就会导致频繁的垃圾回收器的回收,GC的时候,一定是会导致工作线程停止,也就是导致Spark暂停工作那么一点时间。频繁 GC的话,对Spark作业的运行的速度会有相当可观的影响。
所以我们要避免这种情况的发生,则引入了广播变量。
广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,就可以让变量产生的副本大大减少。
使用搜狗实验室提供【用户查询日志(SogouQ)】数据,使用Spark框架,将数据封装到RDD中进行业务数据处理分析。数据网址:http://www.sogou.com/labs/resource/q.php
1)、数据介绍:搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。2)、数据格式 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL 用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID 3)、数据下载:分为三个数据集,大小不一样 迷你版(样例数据, 376KB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip精简版(1天数据,63MB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip完整版(1.9GB):http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip针对SougoQ用户查询日志数据中不同字段,不同业务进行统计分析: 使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数处理分析,灵活掌握Scala语言编程。
使用比较流行好用中文分词:HanLP,面向生产环境的自然语言处理工具包,HanLP 是由一系列模型与算法组成的 Java 工具包,目标是普及自然语言处理在生产环境中的应用。 官方网站:http://www.hanlp.com/,添加Maven依赖
<!-- https://mvnrepository.com/artifact/com.hankcs/hanlp --> <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.7</version> </dependency>演示范例:HanLP 入门案例,基本使用
import java.util import com.hankcs.hanlp.HanLP import com.hankcs.hanlp.seg.common.Term import com.hankcs.hanlp.tokenizer.StandardTokenizer import scala.collection.JavaConverters._ /** * HanLP 入门案例,基本使用 */ object HanLPTest { def main(args: Array[String]): Unit = { // 入门Demo val terms: util.List[Term] = HanLP.segment("杰克奥特曼全集视频") println(terms) println(terms.asScala.map(_.word.trim)) // 标准分词 val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重阳") println(terms1) println(terms1.asScala.map(_.word.replaceAll("\\s+", ""))) val words: Array[String] = """00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html""" .split("\\s+") words.foreach(println) println(words(2).replaceAll("\\[|\\]", "")) } }将每行日志数据封装到CaseClass样例类SogouRecord中,方便后续处理:
/** * 用户搜索点击网页记录Record * @param queryTime 访问时间,格式为:HH:mm:ss * @param userId 用户ID * @param queryWords 查询词 * @param resultRank 该URL在返回结果中的排名 * @param clickRank 用户点击的顺序号 * @param clickUrl 用户点击的URL */ case class SogouRecord( queryTime: String, // userId: String, // queryWords: String, // resultRank: Int, // clickRank: Int, // clickUrl: String // )先读取数据,封装到SougoRecord类中,再按照业务处理数据。
构建SparkContext实例对象,读取本次SogouQ.sample数据,封装到SougoRecord中 。
// TODO: 1. 本地读取SogouQ用户查询日志数据 //val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.sample") val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.reduced") //println(s"Count = ${rawLogsRDD.count()}") // TODO: 2. 解析数据,封装到CaseClass样例类中 val recordsRDD: RDD[SogouRecord] = rawLogsRDD // 过滤不合法数据,如null,分割后长度不等于6 .filter(log => null != log && log.trim.split("\\s+").length == 6) // 对每个分区中数据进行解析,封装到SogouRecord .mapPartitions{iter => iter.map{log => val arr: Array[String] = log.trim.split("\\s+") SogouRecord( arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), // arr(3).toInt, arr(4).toInt, arr(5) // ) } } println(s"Count = ${recordsRDD.count()}, First = ${recordsRDD.first()}")获取用户【查询词】,使用HanLP进行分词,按照单词分组聚合统计出现次数,类似WordCount程序,具体代码如下:
// =================== 3.1 搜索关键词统计 =================== // a. 获取搜索词,进行中文分词 val wordsRDD: RDD[String] = recordsRDD.mapPartitions{iter => iter.flatMap{record => // 使用HanLP中文分词库进行分词 val terms: util.List[Term] = HanLP.segment(record.queryWords.trim) // 将Java中集合对转换为Scala中集合对象 import scala.collection.JavaConverters._ terms.asScala.map(term => term.word) } } //println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}") // b. 统计搜索词出现次数,获取次数最多Top10 val top10SearchWords: Array[(Int, String)] = wordsRDD .map(word => (word, 1)) // 每个单词出现一次 .reduceByKey((tmp, item) => tmp + item) // 分组统计次数 .map(tuple => tuple.swap) .sortByKey(ascending = false) // 词频降序排序 .take(10) // 获取前10个搜索词 top10SearchWords.foreach(println)运行结果如下,仅仅显示搜索最多关键词,其中需要过滤谓词:
统计出每个用户每个搜索词点击网页的次数,可以作为搜索引擎搜索效果评价指标。先按照用户ID分组,再按照【查询词】分组,最后统计次数,求取最大次数、最小次数及平均次数。
// =================== 3.2 用户搜索点击次数统计 =================== /* 每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度 先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数 */ val clickCountRDD: RDD[((String, String), Int)] = recordsRDD .map{record => // 获取用户ID和搜索词 val key = record.userId -> record.queryWords (key, 1) } // 按照用户ID和搜索词组合的Key分组聚合 .reduceByKey((tmp, item) => tmp + item) clickCountRDD .sortBy(tuple => tuple._2, ascending = false) .take(10).foreach(println) println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}") println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}") println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")程序运行结果如下:
按照【访问时间】字段获取【小时】,分组统计各个小时段用户查询搜索的数量,进一步观察用户喜欢在哪些时间段上网,使用搜狗引擎搜索,代码如下:
// =================== 3.3 搜索时间段统计 =================== /* 从搜索时间字段获取小时,统计个小时搜索次数 */ val hourSearchRDD: RDD[(String, Int)] = recordsRDD // 提取小时 .map{record => // 03:12:50 record.queryTime.substring(0, 2) } // 分组聚合 .map(word => (word, 1)) // 每个单词出现一次 .reduceByKey((tmp, item) => tmp + item) // 分组统计次数 .sortBy(tuple => tuple._2, ascending = false) hourSearchRDD.foreach(println)程序运行结果如下: