Spark(34) -- Spark SQL补充 -- 窗口函数案例

    科技2024-03-31  93

    案例一: 第一名和第二名案例

    目标 掌握如何使用 SQL 和 DataFrame 完成名次统计, 并且对窗口函数有一个模糊的认识, 方便后面的启发

    需求介绍 数据集 product : 商品名称 categroy : 类别 revenue : 收入

    需求分析

    从数据集中得到每个类别收入第一的商品和收入第二的商品关键点是, 每个类别, 收入前两名

    方案1: 使用常见语法子查询 问题1: Spark 和 Hive 这样的系统中, 有自增主键吗? 没有 问题2: 为什么分布式系统中很少见自增主键? 因为分布式环境下数据在不同的节点中, 很难保证顺序 解决方案: 按照某一列去排序, 取前两条数据 遗留问题: 不容易在分组中取每一组的前两个

    SELECT * FROM productRevenue ORDER BY revenue LIMIT 2 Limit只能用于全局而不用作用每个分组

    方案2: 计算每一个类别的按照收入排序的序号, 取每个类别中的前两个 思路步骤

    按照类别分组每个类别中的数据按照收入排序为排序过的数据增加编号取得每个类别中的前两个数据作为最终结果 使用 SQL 就不太容易做到, 需要一个语法, 叫窗口函数(MySQL和Hive都有)

    代码编写 创建初始环境

    创建新的类 WindowFunction编写测试方法初始化 SparkSession创建数据集 class WindowFunction { @Test def firstSecond(): Unit = { val spark = SparkSession.builder() .appName("window") .master("local[6]") .getOrCreate() import spark.implicits._ val data = Seq( ("Thin", "Cell phone", 6000), ("Normal", "Tablet", 1500), ("Mini", "Tablet", 5500), ("Ultra thin", "Cell phone", 5000), ("Very thin", "Cell phone", 6000), ("Big", "Tablet", 2500), ("Bendable", "Cell phone", 3000), ("Foldable", "Cell phone", 3000), ("Pro", "Tablet", 4500), ("Pro2", "Tablet", 6500) ) val source = data.toDF("product", "category", "revenue") } }

    方式一: SQL 语句::

    //source.createOrRe...... SELECT product, category, revenue from ( SELECT product, category, revenue, dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank FROM productRevenue) tmp WHERE rank <= 2 // PARTITION BY大小写不限

    窗口函数在 SQL 中的完整语法如下

    function OVER (PARITION BY ... ORDER BY ... FRAME_TYPE BETWEEN ... AND ...)

    方式二: 使用 DataFrame 的命令式 API::

    val window: WindowSpec = Window.partitionBy('category) .orderBy('revenue.desc) source.select('product, 'category, 'revenue, dense_rank() over window as "rank") .where('rank <= 2) .show()

    WindowSpec : 窗口的描述符, 描述窗口应该是怎么样的 dense_rank() over window : 表示一个叫做 dense_rank() 的函数作用于每一个窗口 结果: 总结 在 Spark 中, 使用 SQL 或者 DataFrame 都可以操作窗口 窗口的使用有两个步骤

    定义窗口规则定义窗口函数 在不同的范围内统计名次时, 窗口函数非常得力

    案例2. 最优差值案例

    目标 能够针对每个分类进行计算, 求得常见指标, 并且理解实践上面的一些理论 步骤

    需求介绍代码实现

    需求介绍 源数据集 需求: 统计每个商品和此品类最贵商品之间的差值 目标数据集

    步骤

    创建数据集创建窗口, 按照 revenue 分组, 并倒叙排列应用窗口

    代码

    val spark = SparkSession.builder() .appName("window") .master("local[6]") .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ val data = Seq( ("Thin", "Cell phone", 6000), ("Normal", "Tablet", 1500), ("Mini", "Tablet", 5500), ("Ultra thin", "Cell phone", 5500), ("Very thin", "Cell phone", 6000), ("Big", "Tablet", 2500), ("Bendable", "Cell phone", 3000), ("Foldable", "Cell phone", 3000), ("Pro", "Tablet", 4500), ("Pro2", "Tablet", 6500) ) val source = data.toDF("product", "category", "revenue") val windowSpec = Window.partitionBy('category) .orderBy('revenue.desc) #(max('revenue) over windowSpec) 每一组的最高价格 source.select( 'product, 'category, 'revenue, ((max('revenue) over windowSpec) - 'revenue) as 'revenue_difference ).show()

    Processed: 0.009, SQL: 8