Spark(32) --Spark SQL补充 -- 聚合

    科技2024-04-17  10

    1. groupBy

     groupBy 算子会按照列将 Dataset 分组, 并返回一个 RelationalGroupedDataset 对象, 通过 RelationalGroupedDataset 可以对分组进行聚合

    Step 1: 加载实验数据

    private val spark = SparkSession.builder() .master("local[6]") .appName("aggregation") .getOrCreate() import spark.implicits._ private val schema = StructType( List( StructField("id", IntegerType), StructField("year", IntegerType), StructField("month", IntegerType), StructField("day", IntegerType), StructField("hour", IntegerType), StructField("season", IntegerType), StructField("pm", DoubleType) ) ) private val pmDF = spark.read .schema(schema) .option("header", value = true) .csv("dataset/pm_without_null.csv")

    Step 2: 使用 functions 函数进行聚合

    import org.apache.spark.sql.functions._ val groupedDF: RelationalGroupedDataset = pmDF.groupBy('year) groupedDF.agg(avg('pm) as "pm_avg") .orderBy('pm_avg) .show()

    Step 3: 除了使用 functions 进行聚合, 还可以直接使用 RelationalGroupedDataset 的 API 进行聚合

    groupedDF.avg("pm") .orderBy('pm_avg) .show() groupedDF.max("pm") .orderBy('pm_avg) .show()

    2. 多维聚合

     我们可能经常需要针对数据进行多维的聚合, 也就是一次性统计小计, 总计等, 一般的思路如下

    Step 1: 准备数据

    private val spark = SparkSession.builder() .master("local[6]") .appName("aggregation") .getOrCreate() import spark.implicits._ private val schemaFinal = StructType( List( StructField("source", StringType), StructField("year", IntegerType), StructField("month", IntegerType), StructField("day", IntegerType), StructField("hour", IntegerType), StructField("season", IntegerType), StructField("pm", DoubleType) ) ) private val pmFinal = spark.read .schema(schemaFinal) .option("header", value = true) .csv("dataset/pm_final.csv")

    Step 2: 进行多维度聚合

    import org.apache.spark.sql.functions._ val groupPostAndYear = pmFinal.groupBy('source, 'year) .agg(sum("pm") as "pm") val groupPost = pmFinal.groupBy('source) .agg(sum("pm") as "pm") .select('source, lit(null) as "year", 'pm) groupPostAndYear.union(groupPost) .sort('source, 'year asc_nulls_last, 'pm) .show()

    3. rollup 操作符

     rollup 操作符其实就是 groupBy 的一个扩展, rollup 会对传入的列进行滚动 groupBy, groupBy 的次数为列数量 + 1, 最后一次是对整个数据集进行聚合

    Step 1: 创建数据集

    import org.apache.spark.sql.functions._ val sales = Seq( ("Beijing", 2016, 100), ("Beijing", 2017, 200), ("Shanghai", 2015, 50), ("Shanghai", 2016, 150), ("Guangzhou", 2017, 50) ).toDF("city", "year", "amount")

    Step 1: rollup 的操作

    统计每个城市每年的销售额-city+year统计每个城市一共的销售额-city+sum统计总体的销售额-sum sales.rollup("city", "year") .agg(sum("amount") as "amount") .sort($"city".desc_nulls_last, $"year".asc_nulls_last) .show() /** * 结果集: * +---------+----+------+ * | city|year|amount| * +---------+----+------+ * | Shanghai|2015| 50| <-- 上海 2015 的小计 * | Shanghai|2016| 150|<-- 上海 2016 的小计 * | Shanghai|null| 200| <-- 上海的总计 * |Guangzhou|2017| 50| * |Guangzhou|null| 50| * | Beijing|2016| 100| * | Beijing|2017| 200| * | Beijing|null| 300| * | null|null| 550| <-- 整个数据集的总计 * +---------+----+------+ */

    Step 2: 如果使用基础的 groupBy 如何实现效果?

    val cityAndYear = sales .groupBy("city", "year") // 按照 city 和 year 聚合 .agg(sum("amount") as "amount") val city = sales .groupBy("city") // 按照 city 进行聚合 .agg(sum("amount") as "amount") .select($"city", lit(null) as "year", $"amount") val all = sales .groupBy() // 全局聚合 .agg(sum("amount") as "amount") .select(lit(null) as "city", lit(null) as "year", $"amount") cityAndYear .union(city) .union(all) .sort($"city".desc_nulls_last, $"year".asc_nulls_last) .show() /** * 统计结果: * +---------+----+------+ * | city|year|amount| * +---------+----+------+ * | Shanghai|2015| 50| * | Shanghai|2016| 150| * | Shanghai|null| 200| * |Guangzhou|2017| 50| * |Guangzhou|null| 50| * | Beijing|2016| 100| * | Beijing|2017| 200| * | Beijing|null| 300| * | null|null| 550| * +---------+----+------+ */

     很明显可以看到, 在上述案例中, rollup 就相当于先按照 city, year 进行聚合, 后按照 city 进行聚合, 最后对整个数据集进行聚合, 在按照 city 聚合时, year 列值为 null, 聚合整个数据集的时候, 除了聚合列, 其它列值都为 null

    4. 使用 rollup 完成 pm 值的统计

    上面的案例使用 rollup 来实现会非常的简单

    import org.apache.spark.sql.functions._ pmFinal.rollup('source, 'year) .agg(sum("pm") as "pm_total") .sort('source.asc_nulls_last, 'year.asc_nulls_last) .show()

    5. cube

    cube 的功能和 rollup 是一样的, 但也有区别, 区别如下

    rollup(A, B).sum© 其结果集中会有三种数据形式: A B C, A null C, null null C 不知道大家发现没, 结果集中没有对 B 列的聚合结果cube(A, B).sum© 其结果集中会有四种数据形式: A B C, A null C, null null C, null B C(也就是cube中不仅包括rollup的groupy A还包括group B) 不知道大家发现没, 比 rollup 的结果集中多了一个 null B C, 也就是说, rollup 只会按照第一个列来进行组合聚合, 但是 cube 会将全部列组合聚合 import org.apache.spark.sql.functions._ pmFinal.cube('source, 'year) .agg(sum("pm") as "pm_total") .sort('source.asc_nulls_last, 'year.asc_nulls_last) .show() /** * 结果集为 * * +-------+----+---------+ * | source|year| pm_total| * +-------+----+---------+ * | dongsi|2013| 735606.0| * | dongsi|2014| 745808.0| * | dongsi|2015| 752083.0| * | dongsi|null|2233497.0| * |us_post|2010| 841834.0| * |us_post|2011| 796016.0| * |us_post|2012| 750838.0| * |us_post|2013| 882649.0| * |us_post|2014| 846475.0| * |us_post|2015| 714515.0| * |us_post|null|4832327.0| * | null|2010| 841834.0| <-- 新增 * | null|2011| 796016.0| <-- 新增 * | null|2012| 750838.0| <-- 新增 * | null|2013|1618255.0| <-- 新增 * | null|2014|1592283.0| <-- 新增 * | null|2015|1466598.0| <-- 新增 * | null|null|7065824.0| * +-------+----+---------+ */

    6. SparkSQL 中支持的 SQL 语句实现 cube 功能

     SparkSQL 支持 GROUPING SETS 语句, 可以随意排列组合空值分组聚合的顺序和组成, 既可以实现 cube 也可以实现 rollup 的功能

    pmFinal.createOrReplaceTempView("pm_final") spark.sql( """ |select source, year, sum(pm) |from pm_final |group by source, year |grouping sets((source, year), (source), (year), ()) |order by source asc nulls last, year asc nulls last """.stripMargin) .show() //()表示全局的groupby

    下面是avg的结果。

    7. RelationalGroupedDataset对象

    常见的 RelationalGroupedDataset 获取方式有三种

    groupByrollupcube  无论通过任何一种方式获取了 RelationalGroupedDataset 对象, 其所表示的都是是一个被分组的 DataFrame, 通过这个对象, 可以对数据集的分组结果进行聚合val groupedDF: RelationalGroupedDataset = pmDF.groupBy('year)  需要注意的是, RelationalGroupedDataset 并不是 DataFrame, 所以其中并没有 DataFrame 的方法, 只有如下一些聚合相关的方法, 如下这些方法在调用过后会生成 DataFrame 对象, 然后就可以再次使用 DataFrame 的算子进行操作了

    Processed: 0.018, SQL: 9