大数据之spark

    科技2022-07-12  129

    连续登录三天的用户案例:

    数据 uid,dt guid01,2018-02-28 guid01,2018-03-01 guid01,2018-03-01 guid01,2018-03-05 guid01,2018-03-02 guid01,2018-03-04 guid01,2018-03-06 guid01,2018-03-07 guid02,2018-03-01 guid02,2018-03-03 guid02,2018-03-02 guid02,2018-03-06

    SQL代码实现

    package com.doit.spark.day10 import org.apache.spark.sql.{DataFrame, SparkSession} object UsersLoginThreeDaysDemo { def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate() val dataFrame: DataFrame = sparkSession.read .option("header", "true") //读取数据的第一行作为表的字段名 .option("inferSchema", "true") //自动推断数据类型 .csv("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\login.csv") dataFrame.createTempView("v_user_login") sparkSession.sql( s""" | |SELECT | uid, | MIN(dt) start_time, | MAX(dt) end_time, | COUNT(1) num_days |FROM |( | SELECT | uid,dt, | DATE_SUB(dt,row_num) dis_date | FROM | ( | SELECT | uid,dt, | ROW_NUMBER() OVER(PARTITION BY uid ORDER BY dt ASC) as row_num --对uid开窗口,并在窗口内排序,打标记 | FROM | ( | SELECT | dis.uid, | dis.dt | FROM | ( | SELECT | DISTINCT(uid,dt) dis --将uid,dt联合去重 | FROM | v_user_login | ) | ) | ) |)GROUP BY uid,dis_date HAVING num_days >= 3 | |""".stripMargin).show() sparkSession.stop() } } +------+----------+----------+--------+ | uid|start_time| end_time|num_days| +------+----------+----------+--------+ |guid02|2018-03-01|2018-03-03| 3| |guid01|2018-02-28|2018-03-02| 3| |guid01|2018-03-04|2018-03-07| 4| +------+----------+----------+--------+

    DSL代码实现

    package com.doit.spark.day10 import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{DataFrame, SparkSession} object UsersLoginThreeDaysDemo { def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate() val dataFrame: DataFrame = sparkSession.read .option("header", "true") //读取数据的第一行作为表的字段名 .option("inferSchema", "true") //自动推断数据类型 .csv("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\login.csv") //使用DSL风格的,需要先导入这两个隐式转换 import sparkSession.implicits._ import org.apache.spark.sql.functions._ //去重 dataFrame.distinct() .select('uid,$"dt", row_number() over(Window.partitionBy('uid).orderBy('dt)) as 'row_num ).select('uid,$"dt", date_sub('dt,'row_num) as 'dis_date ).groupBy('uid,'dis_date) .agg(min('dt),max('dt), count('*) as 'count_days ) .where('count_days >= 3).show() sparkSession.stop() } } +------+----------+----------+----------+----------+ | uid| dis_date| min(dt)| max(dt)|count_days| +------+----------+----------+----------+----------+ |guid02|2018-02-28|2018-03-01|2018-03-03| 3| |guid01|2018-02-27|2018-02-28|2018-03-02| 3| |guid01|2018-02-28|2018-03-04|2018-03-07| 4| +------+----------+----------+----------+----------+

    流量统计案例:

    数据 1,2020-02-18 14:20:30,2020-02-18 14:46:30,20 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30 1,2020-02-18 15:37:23,2020-02-18 16:05:26,40 1,2020-02-18 16:06:27,2020-02-18 17:20:49,50 1,2020-02-18 17:21:50,2020-02-18 18:03:27,60 2,2020-02-18 14:18:24,2020-02-18 15:01:40,20 2,2020-02-18 15:20:49,2020-02-18 15:30:24,30 2,2020-02-18 16:01:23,2020-02-18 16:40:32,40 2,2020-02-18 16:44:56,2020-02-18 17:40:52,50 3,2020-02-18 14:39:58,2020-02-18 15:35:53,20 3,2020-02-18 15:36:39,2020-02-18 15:24:54,30

    sql代码实现

    package com.doit.spark.day10 import org.apache.spark.sql.{DataFrame, SparkSession} object FowRollupSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val df: DataFrame = spark.read.option("header", "true").format("csv").load("D:\\每日总结\\视频\\spark\\spark-day06\\资料\\data.csv") //注册视图 df.createTempView("v_user_flow") spark.sql( s""" | |SELECT | uid,MIN(start_time),MAX(end_time),SUM(flow) |FROM |( | SELECT | uid,start_time,end_time,flow, | SUM(num) OVER(PARTITION BY uid ORDER BY start_time) as sum_num --在窗口内将给的数字加总,从而分开每个阶段 | FROM | ( | SELECT | uid,start_time,end_time,flow, | IF((UNIX_TIMESTAMP(start_time)-UNIX_TIMESTAMP(lag_time))/60 >10 ,1,0) AS num --判断时间差是否大于10分钟,大于就给1,小于就给0 | FROM | ( | SELECT | uid,start_time,end_time,flow, | LAG(end_time,1,start_time) OVER(PARTITION BY uid ORDER BY start_time) as lag_time--将end_time往下压一行,第一行为null时,将start_time的值给它 | FROM | v_user_flow | ) | ) |)GROUP BY uid,sum_num | | | |""".stripMargin).show() } }

    DSL风格代码实现

    package com.doit.spark.day10 import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{DataFrame, SparkSession} object FowRollupSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val df: DataFrame = spark.read.option("header", "true").format("csv").load("D:\\每日总结\\视频\\spark\\spark-day06\\资料\\data.csv") import spark.implicits._ import org.apache.spark.sql.functions._ df.select('uid, 'start_time,'end_time,'flow, expr("lag(end_time,1,start_time)") over(Window.partitionBy('uid).orderBy('start_time)) as 'lag_time //表达式expr中可以放入SQL的一个片段 ).select('uid, 'start_time,'end_time,'flow, expr("if ((unix_timestamp(start_time)-unix_timestamp(lag_time))/60 > 10 ,1,0)") as 'num ).select('uid, 'start_time,'end_time,'flow, sum('num) over(Window.partitionBy('uid).orderBy('start_time)) as 'sum_num ).groupBy('uid,'sum_num).agg( min('start_time) as 'min_time,max('end_time) as 'max_time,sum('flow) as 'sum_flow ).select( 'uid,'min_time,'max_time,'sum_flow ).show() } } +---+-------------------+-------------------+--------+ |uid| min_time| max_time|sum_flow| +---+-------------------+-------------------+--------+ | 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 50.0| | 1|2020-02-18 14:20:30|2020-02-18 15:20:30| 50.0| | 1|2020-02-18 15:37:23|2020-02-18 18:03:27| 150.0| | 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20.0| | 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30.0| | 2|2020-02-18 16:01:23|2020-02-18 17:40:52| 90.0| +---+-------------------+-------------------+--------+
    Processed: 0.011, SQL: 8