Spark(27) -- SparkSql的分析函数(scala版)

    科技2024-01-16  81

    1. 案例需求分析

    1、需求  有json数据格式如下,分别是三个字段,对应学生姓名,所属班级,所得分数,求每个班级当中分数最高的前N个学生(分组求topN)  创建hive表,加载score.txt文件  利用分组求和topn:row_number(),rank_over,dense_rank_over() 文件内容如下:

    {"name":"a","clazz":1,"score":80} {"name":"b","clazz":1,"score":78} {"name":"c","clazz":1,"score":95} {"name":"d","clazz":2,"score":74} {"name":"e","clazz":2,"score":92} {"name":"f","clazz":3,"score":99} {"name":"g","clazz":3,"score":99} {"name":"h","clazz":3,"score":45} {"name":"i","clazz":3,"score":55} {"name":"j","clazz":3,"score":78}

    2、代码实现如下

    import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SparkSession} object SparkTopN { def main(args: Array[String]): Unit = { //获取sparkSession val sparkSession: SparkSession = SparkSession.builder().appName("sparkTopN").master("local[2]").getOrCreate() //通过sparkSession得到sparkContext,并设置日志级别 val sparkContext: SparkContext = sparkSession.sparkContext sparkContext.setLogLevel("WARN") //读取json格式的数据,得到DataFrame val jsonDF: DataFrame = sparkSession.read.json("file:///.......\\score.txt") val schema: Unit = jsonDF.printSchema() // jsonDF.show() //通过DF创建临时表 jsonDF.createOrReplaceTempView("score") //通过sql语句执行查询 sparkSession.sql("select * from score").show() println("//*************** 求每个班最高成绩学生的信息 ***************/") println("/******* 开窗函数的表 ********/") sparkSession.sql("select name,clazz,score, rank() over(partition by clazz order by score desc) rank from score").show() println("/******* 计算结果的表 *******") sparkSession.sql("select * from " + "( select name,clazz,score,rank() over(partition by clazz order by score desc) rank from score) " + "as t " + "where t.rank=1").show() println("/************求每个班最高成绩学生的信息(groupBY) ***************/") sparkSession.sql("select clazz, max(score) max from score group by clazz").show() sparkSession.sql("select a.name, b.clazz, b.max from score a, " + "(select clazz, max(score) max from score group by clazz) as b " + "where a.score = b.max").show() println("rank()跳跃排序,有两个第二名时后边跟着的是第四名\n" + "dense_rank() 连续排序,有两个第二名时仍然跟着第三名\n" + "over()开窗函数:\n" + " 在使用聚合函数后,会将多行变成一行,而开窗函数是将一行变成多行;\n" + " 并且在使用聚合函数后,如果要显示其他的列必须将列加入到group by中,\n" + " 而使用开窗函数后,可以不使用group by,直接将所有信息显示出来。\n" + " 开窗函数适用于在每一行的最后一列添加聚合函数的结果。\n" + "常用开窗函数:\n" + " 1.为每条数据显示聚合信息.(聚合函数() over())\n" + " 2.为每条数据提供分组的聚合函数结果(聚合函数() over(partition by 字段) as 别名) \n" + " --按照字段分组,分组后进行计算\n" + " 3.与排名函数一起使用(row number() over(order by 字段) as 别名)\n" + "常用分析函数:(最常用的应该是1.2.3 的排序)\n" + " 1、row_number() over(partition by ... order by ...)\n" + " 2、rank() over(partition by ... order by ...)\n" + " 3、dense_rank() over(partition by ... order by ...)\n" + " 4、count() over(partition by ... order by ...)\n" + " 5、max() over(partition by ... order by ...)\n" + " 6、min() over(partition by ... order by ...)\n" + " 7、sum() over(partition by ... order by ...)\n" + " 8、avg() over(partition by ... order by ...)\n" + " 9、first_value() over(partition by ... order by ...)\n" + " 10、last_value() over(partition by ... order by ...)\n" + " 11、lag() over(partition by ... order by ...)\n" + " 12、lead() over(partition by ... order by ...)\n" + "lag 和lead 可以 获取结果集中,按一定排序所排列的当前行的上下相邻若干offset 的某个行的某个列(不用结果集的自关联);\n" + "lag ,lead 分别是向前,向后;\n" + "lag 和lead 有三个参数,第一个参数是列名,第二个参数是偏移的offset,第三个参数是 超出记录窗口时的默认值") sparkContext.stop() sparkSession.close() } }

    2. 开窗函数

    2.1 概述

    介绍  开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。 开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

    聚合函数和开窗函数 聚合函数是将多行变成一行,count,avg… 开窗函数是将一行变成多行; 聚合函数如果要显示其他的列必须将列加入到group by中 开窗函数可以不使用group by,直接将所有信息显示出来

    开窗函数分类

    聚合开窗函数 聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。排序开窗函数 排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

    2.2 准备工作

    /export/servers/spark/bin/spark-shell --master spark://node01:7077,node02:7077 case class Score(name: String, clazz: Int, score: Int) val scoreDF = spark.sparkContext.makeRDD(Array( Score("a1", 1, 80), Score("a2", 1, 78), Score("a3", 1, 95), Score("a4", 2, 74), Score("a5", 2, 92), Score("a6", 3, 99), Score("a7", 3, 99), Score("a8", 3, 45), Score("a9", 3, 55), Score("a10", 3, 78), Score("a11", 3, 100)) ).toDF("name", "class", "score") scoreDF.createOrReplaceTempView("scores") scoreDF.show() +----+-----+-----+ |name|class|score| +----+-----+-----+ | a1| 1| 80| | a2| 1| 78| | a3| 1| 95| | a4| 2| 74| | a5| 2| 92| | a6| 3| 99| | a7| 3| 99| | a8| 3| 45| | a9| 3| 55| | a10| 3| 78| | a11| 3| 100| +----+-----+-----+

    2.3 聚合开窗函数

    示例1 OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。 SQL标准允许将所有聚合函数用做聚合开窗函数。 spark.sql("select count(name) from scores").show spark.sql("select name, class, score, count(name) over() name_count from scores").show

    查询结果如下所示:

    +----+-----+-----+----------+ |name|class|score|name_count| +----+-----+-----+----------+ | a1| 1| 80| 11| | a2| 1| 78| 11| | a3| 1| 95| 11| | a4| 2| 74| 11| | a5| 2| 92| 11| | a6| 3| 99| 11| | a7| 3| 99| 11| | a8| 3| 45| 11| | a9| 3| 55| 11| | a10| 3| 78| 11| | a11| 3| 100| 11| +----+-----+-----+----------+ 示例2 OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。 如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。

     注意:与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。

     下面的 SQL 语句用于显示按照班级分组后每组的人数: OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。

    spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show

    查询结果如下所示:

    +----+-----+-----+----------+ |name|class|score|name_count| +----+-----+-----+----------+ | a1| 1| 80| 3| | a2| 1| 78| 3| | a3| 1| 95| 3| | a6| 3| 99| 6| | a7| 3| 99| 6| | a8| 3| 45| 6| | a9| 3| 55| 6| | a10| 3| 78| 6| | a11| 3| 100| 6| | a4| 2| 74| 2| | a5| 2| 92| 2| +----+-----+-----+----------+

    2.4 排序开窗函数

    (1)ROW_NUMBER顺序排序  row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号 注意:  在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。

    示例1 spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a2| 1| 78| 4| | a10| 3| 78| 5| | a1| 1| 80| 6| | a5| 2| 92| 7| | a3| 1| 95| 8| | a6| 3| 99| 9| | a7| 3| 99| 10| | a11| 3| 100| 11| +----+-----+-----+----+ spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 5| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+

    (2)RANK跳跃排序  rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。  这个函数求出来的排名结果可以并列,并列排名之后的排名将是并列的排名加上并列数  简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名

    示例2 spark.sql("select name, class, score, rank() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a10| 3| 78| 4| | a2| 1| 78| 4| | a1| 1| 80| 6| | a5| 2| 92| 7| | a3| 1| 95| 8| | a6| 3| 99| 9| | a7| 3| 99| 9| | a11| 3| 100| 11| +----+-----+-----+----+ spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 4| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+

    (3)DENSE_RANK连续排序  dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。  这个函数并列排名之后的排名只是并列排名加1  简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名

    示例3 spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a2| 1| 78| 4| | a10| 3| 78| 4| | a1| 1| 80| 5| | a5| 2| 92| 6| | a3| 1| 95| 7| | a6| 3| 99| 8| | a7| 3| 99| 8| | a11| 3| 100| 9| +----+-----+-----+----+ spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 4| | a11| 3| 100| 5| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+

    (4)NTILE分组排名[了解]

     ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。

    示例4 spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 1| | a4| 2| 74| 2| | a2| 1| 78| 2| | a10| 3| 78| 3| | a1| 1| 80| 3| | a5| 2| 92| 4| | a3| 1| 95| 4| | a6| 3| 99| 5| | a7| 3| 99| 5| | a11| 3| 100| 6| +----+-----+-----+----+ spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 5| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+

    3. 完整scala代码

    /** * DESC: * Complete data processing and modeling process steps: * 分组求解TopN * {"name":"a","clazz":1,"score":80} * {"name":"b","clazz":1,"score":78} * {"name":"c","clazz":1,"score":95} * * {"name":"d","clazz":2,"score":74} * {"name":"e","clazz":2,"score":92} * * {"name":"f","clazz":3,"score":99} * {"name":"g","clazz":3,"score":99} * {"name":"h","clazz":3,"score":45} * {"name":"i","clazz":3,"score":55} * {"name":"j","clazz":3,"score":78} */ import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SparkSession} object SparkTopnVersion { def main(args: Array[String]): Unit = { //获取sparkSession val sparkSession: SparkSession = SparkSession.builder().appName("sparkTopN").master("local[2]").getOrCreate() //通过sparkSession得到sparkContext,并设置日志级别 val sparkContext: SparkContext = sparkSession.sparkContext sparkContext.setLogLevel("WARN") //读取json格式的数据,得到DataFrame val path = "D:\\BigData\\Workspace\\SparkProjectTest\\day03_sparkSqlTest\\src\\main\\resources\\score.txt" val jsonDF: DataFrame = sparkSession.read.json(path) val schema: Unit = jsonDF.printSchema() // root // |-- clazz: long (nullable = true) // |-- name: string (nullable = true) // |-- score: long (nullable = true) // jsonDF.show() //通过DF创建临时表 jsonDF.createOrReplaceTempView("score") //通过sql语句执行查询 sparkSession.sql("select * from score").show() // +-----+----+-----+ // |clazz|name|score| // +-----+----+-----+ // | 1| a| 80| // | 1| b| 78| // | 1| c| 95| // | 2| d| 74| // | 2| e| 92| // | 3| f| 99| // | 3| g| 99| // | 3| h| 45| // | 3| i| 55| // | 3| j| 78| // +-----+----+-----+ println("//*************** 求每个班最高成绩学生的信息 ***************/") println("/******* 开窗函数的表 ********/") sparkSession.sql("select name,clazz,score, rank() over(partition by clazz order by score desc) rank from score").show() /* +----+-----+-----+----+ |name|clazz|score|rank| +----+-----+-----+----+ | c| 1| 95| 1| | a| 1| 80| 2| | b| 1| 78| 3| | f| 3| 99| 1| | g| 3| 99| 1| | j| 3| 78| 3| | i| 3| 55| 4| | h| 3| 45| 5| | e| 2| 92| 1| | d| 2| 74| 2| +----+-----+-----+----+*/ println("//*************** 求每个班最高成绩学生的信息 ***************/") println("/******* 开窗函数的表 ********/") println("rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。") sparkSession.sql("select name,clazz,score, rank() over(order by score desc) rank from score").show() sparkSession.sql("select name,clazz,score, rank() over(partition by clazz order by score desc) rank from score").show() // +----+-----+-----+----+ // |name|clazz|score|rank| // +----+-----+-----+----+ // | c| 1| 95| 1| // | a| 1| 80| 2| // | b| 1| 78| 3| // | f| 3| 99| 1| // | g| 3| 99| 1| // | j| 3| 78| 3| // | i| 3| 55| 4| // | h| 3| 45| 5| // | e| 2| 92| 1| // | d| 2| 74| 2| // +----+-----+-----+----+ println("//*************** 求每个班最高成绩学生的信息ROW_NUMBER ***************/") println("/******* 开窗函数的表 ********/") println("row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号\n注意:\n在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。/") sparkSession.sql("select name,clazz,score, row_number() over(order by score) rank from score").show() sparkSession.sql("select name,clazz,score, row_number() over(partition by clazz order by score) rank from score").show() // +----+-----+-----+----+ // |name|clazz|score|rank| // +----+-----+-----+----+ // | b| 1| 78| 1| // | a| 1| 80| 2| // | c| 1| 95| 3| // | h| 3| 45| 1| // | i| 3| 55| 2| // | j| 3| 78| 3| // | f| 3| 99| 4| // | g| 3| 99| 5| // | d| 2| 74| 1| // | e| 2| 92| 2| // +----+-----+-----+----+ println("dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。\n这个函数并列排名之后的排名只是并列排名加1\n简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名") sparkSession.sql("select name,clazz,score, dense_rank() over(order by score) rank from score").show() sparkSession.sql("select name,clazz,score, dense_rank() over(partition by clazz order by score) rank from score").show() // +----+-----+-----+----+ // |name|clazz|score|rank| // +----+-----+-----+----+ // | b| 1| 78| 1| // | a| 1| 80| 2| // | c| 1| 95| 3| // | h| 3| 45| 1| // | i| 3| 55| 2| // | j| 3| 78| 3| // | f| 3| 99| 4| // | g| 3| 99| 4| // | d| 2| 74| 1| // | e| 2| 92| 2| // +----+-----+-----+----+ println("ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。") sparkSession.sql("select name,clazz,score, ntile(6) over(order by score) rank from score").show() sparkSession.sql("select name,clazz,score, ntile(6) over(partition by clazz order by score) rank from score").show() // +----+-----+-----+----+ // |name|clazz|score|rank| // +----+-----+-----+----+ // | b| 1| 78| 1| // | a| 1| 80| 2| // | c| 1| 95| 3| // | h| 3| 45| 1| // | i| 3| 55| 2| // | j| 3| 78| 3| // | f| 3| 99| 4| // | g| 3| 99| 5| // | d| 2| 74| 1| // | e| 2| 92| 2| // +----+-----+-----+----+ sparkContext.stop() sparkSession.close() } }
    Processed: 0.014, SQL: 8