Spark SQL案例(二)流量累加

    科技2022-08-22  116

    Spark SQL案例(二)流量累加

    1. 背景

    本身Spark SQL支持2种风格的API,sql和dsl,各有优势,实际企业开发时,看情况选择。本文种案例是基于企业开发中常见场景抽象出来的案例,数据按照一定规则,将某些字段数据进行聚合,如流量分段累计等常见场景

    2. 案例

    需求,计算连续3天及以上登录用户数据 1,2020-02-18 14:20:30,2020-02-18 14:46:30,20 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30 1,2020-02-18 15:37:23,2020-02-18 16:05:26,40 1,2020-02-18 16:06:27,2020-02-18 17:20:49,50 1,2020-02-18 17:21:50,2020-02-18 18:03:27,60 2,2020-02-18 14:18:24,2020-02-18 15:01:40,20 2,2020-02-18 15:20:49,2020-02-18 15:30:24,30 2,2020-02-18 16:01:23,2020-02-18 16:40:32,40 2,2020-02-18 16:44:56,2020-02-18 17:40:52,50 3,2020-02-18 14:39:58,2020-02-18 15:35:53,20 3,2020-02-18 15:36:39,2020-02-18 15:24:54,30 环境准备 idea 2020jdk1.8scala 2.12.12maven 3.6.3pom <!-- 定义了一些常量 --> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.12.10</scala.version> <spark.version>3.0.1</spark.version> <hbase.version>2.2.5</hbase.version> <hadoop.version>3.2.1</hadoop.version> <encoding>UTF-8</encoding> </properties> <dependencies> <!-- 导入scala的依赖 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <!-- 编译时会引入依赖,打包是不引入依赖 --> <!-- <scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <!-- 编译时会引入依赖,打包是不引入依赖 --> <!-- <scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <!-- 编译scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <!-- 编译java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build>

    2.1 代码一(DSL)

    def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder() .appName("DataFrameDemo_FlowRollUp_DSL") .master("local") .getOrCreate() val structType: StructType = StructType(List( StructField("uid", DataTypes.StringType, false), StructField("start_time", DataTypes.StringType, false), StructField("end_time", DataTypes.StringType, false), StructField("flow", DataTypes.DoubleType, false) )) // 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30 // E:\DOITLearning\12.Spark\netflowRollupSourceData.txt val dataFrame: DataFrame = sparkSession.read .schema(structType) .csv("E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt") // dataFrame.show() import org.apache.spark.sql.functions._ import sparkSession.implicits._ // lag($"end_time", 1, "start_time") 无法将默认值设置为变量,所以改为expr使用sql片段 // 为了防止数据重复,再去重一下 // 因为直接使用Date类型,解析到日,无法解析到秒, // 所以需要先使用字符串去解析,然后使用SELECT to_unix_timestamp('2016-04-08', 'yyyy-MM-dd');转换为时间戳 val dataFrame1: DataFrame = dataFrame.select( $"uid", $"start_time", $"end_time", $"flow", expr("lag(end_time, 1, start_time)") over (Window.partitionBy("uid").orderBy("start_time")) as "last_end_time" ).distinct() // dataFrame1.show() // 使用expr执行sql片段,使用if函数,to_unix_timestamp函数,给数据打上是否符合上下行时间差小于10min的标记 val dataFrame2: DataFrame = dataFrame1.select( $"uid", $"start_time", $"end_time", $"flow", // SELECT if(1 < 2, 'a', 'b'); to_unix_timestamp('2016-04-08', 'yyyy-MM-dd') expr("if( to_unix_timestamp(start_time, 'yyyy-MM-dd HH:mm:ss') - to_unix_timestamp(last_end_time, 'yyyy-MM-dd HH:mm:ss') >60*10, 1, 0 )") as "flag" ) // dataFrame2.show() // 使用sum聚合函数,对每条数据打标记 val dataFrame3: DataFrame = dataFrame2.select( $"uid", $"start_time", $"end_time", $"flow", sum($"flag") over (Window.partitionBy("uid").orderBy($"start_time" asc)) as "sum_flag" ) // dataFrame3.show() // 使用分组,聚合函数,对数据做最终的处理 val dataFrame4: DataFrame = dataFrame3.select( $"uid", $"start_time", $"end_time", $"flow", $"sum_flag" ).groupBy("uid", "sum_flag").agg( $"uid", min("start_time") as "minDate", max("end_time") as "maxDate", sum("flow") as "total_flow", count("*") as "sumed_count" ) dataFrame4.show() sparkSession.close() }

    2.2 代码二(SQL)

    object DataFrameDemo_FlowRollUp_SQL { def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder() .appName("DataFrameDemo_FlowRollUp_SQL") .master("local") .getOrCreate() // 使用structType手动指定列名和数据类型,比读数据时自动推到更加有效和准确 val structType: StructType = StructType(List( StructField("uid", DataTypes.StringType, false), StructField("start_time", DataTypes.StringType, false), StructField("end_time", DataTypes.StringType, false), StructField("flow", DataTypes.DoubleType, false) )) // 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30 // E:\DOITLearning\12.Spark\netflowRollupSourceData.txt // 这里使用dsl风格API,对数据做去重,相比sql要简单很多。 // 本身对来源数据做去重和筛选清洗是每个环节都需要考虑的,类似函数的参数异常检查 val dataFrame: DataFrame = sparkSession.read .schema(structType) .csv("E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt") .distinct() // 使用sql前,先注册视图 dataFrame.createTempView("v_flow") // 实际开发时,一般都是分步骤执行,这样可读性更好 val dataFrame1: DataFrame = sparkSession.sql( """ |select |uid, |min(start_time) as min_date, |max(end_time) as end_time, |sum(flow) as total_flow |from |( | select | uid, | start_time, | end_time, | flow, | sum(flag) over (partition by uid order by start_time asc) as sum_flag | from | ( | select | uid, | start_time, | end_time, | flow, | if( to_unix_timestamp(start_time, 'yyyy-MM-dd HH:mm:ss') - to_unix_timestamp(last_end_time, 'yyyy-MM-dd HH:mm:ss') > 60*10, 1, 0) as flag | from | ( | select | uid, | start_time, | end_time, | flow, | lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time | from | ( | select | uid, | start_time, | end_time, | flow | from | v_flow | ) | ) | ) |) |group by uid, sum_flag |""".stripMargin) dataFrame1.show() sparkSession.close() } } /* *select uid, start_time, end_time, flow from v_flow select uid, start_time, end_time, flow, lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time from ( select uid, start_time, end_time, flow from v_flow ) -- to_unix_timestamp('2016-04-08', 'yyyy-MM-dd'); select uid, start_time, end_time, flow, if( to_unix_timestamp(start_time, 'yyyy-MM-dd HH:mm:ss') - to_unix_timestamp(last_end_time, 'yyyy-MM-dd HH:mm:ss') > 60*10, 1, 0) as flag from ( select uid, start_time, end_time, flow, lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time from ( select uid, start_time, end_time, flow from v_flow ) ) select uid, start_time, end_time, flow, sum(flag) over (partition by uid order by start_time asc) as sum_flag from ( select uid, start_time, end_time, flow, if( to_unix_timestamp(start_time, 'yyyy-MM-dd HH:mm:ss') - to_unix_timestamp(last_end_time, 'yyyy-MM-dd HH:mm:ss') > 60*10, 1, 0) as flag from ( select uid, start_time, end_time, flow, lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time from ( select uid, start_time, end_time, flow from v_flow ) ) ) select uid, min(start_time) as min_date, max(end_time) as end_time, sum(flow) as total_flow from ( select uid, start_time, end_time, flow, sum(flag) over (partition by uid order by start_time asc) as sum_flag from ( select uid, start_time, end_time, flow, if( to_unix_timestamp(start_time, 'yyyy-MM-dd HH:mm:ss') - to_unix_timestamp(last_end_time, 'yyyy-MM-dd HH:mm:ss') > 60*10, 1, 0) as flag from ( select uid, start_time, end_time, flow, lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time from ( select uid, start_time, end_time, flow from v_flow ) ) ) ) group by uid, sum_flag * * */

    2.3 总结

    在企业开发中,因为sql一般都会嵌套多个子查询,所以SQL的可维护性很重要。SQL也是可以写注释的,可以的化,尽量写上注释SQL可以格式化,嵌套时,使用空格将这些sql语句尽量格式化出来代码可维护性,可读性是非常重要的,一定一定注意。
    Processed: 0.019, SQL: 9