Spark SQL案例(二)流量累加
1. 背景
本身Spark SQL支持2种风格的API,sql和dsl,各有优势,实际企业开发时,看情况选择。本文种案例是基于企业开发中常见场景抽象出来的案例,数据按照一定规则,将某些字段数据进行聚合,如流量分段累计等常见场景
2. 案例
需求,计算连续3天及以上登录用户数据
1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60
2,2020-02-18 14:18:24,2020-02-18 15:01:40,20
2,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30
环境准备
idea 2020jdk1.8scala 2.12.12maven 3.6.3pom
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.10</scala.version>
<spark.version>3.0.1</spark.version>
<hbase.version>2.2.5</hbase.version>
<hadoop.version>3.2.1</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.1 代码一(DSL)
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.appName("DataFrameDemo_FlowRollUp_DSL")
.master("local")
.getOrCreate()
val structType: StructType = StructType(List(
StructField("uid", DataTypes.StringType, false),
StructField("start_time", DataTypes.StringType, false),
StructField("end_time", DataTypes.StringType, false),
StructField("flow", DataTypes.DoubleType, false)
))
val dataFrame: DataFrame = sparkSession.read
.schema(structType)
.csv("E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt")
import org.apache.spark.sql.functions._
import sparkSession.implicits._
val dataFrame1: DataFrame = dataFrame.select(
$"uid",
$"start_time",
$"end_time",
$"flow",
expr("lag(end_time, 1, start_time)") over (Window.partitionBy("uid").orderBy("start_time")) as "last_end_time"
).distinct()
val dataFrame2: DataFrame = dataFrame1.select(
$"uid",
$"start_time",
$"end_time",
$"flow",
expr("if( to_unix_timestamp(start_time, 'yyyy-MM-dd HH:mm:ss') - to_unix_timestamp(last_end_time, 'yyyy-MM-dd HH:mm:ss') >60*10, 1, 0 )") as "flag"
)
val dataFrame3: DataFrame = dataFrame2.select(
$"uid",
$"start_time",
$"end_time",
$"flow",
sum($"flag") over (Window.partitionBy("uid").orderBy($"start_time" asc)) as "sum_flag"
)
val dataFrame4: DataFrame = dataFrame3.select(
$"uid",
$"start_time",
$"end_time",
$"flow",
$"sum_flag"
).groupBy("uid", "sum_flag").agg(
$"uid",
min("start_time") as "minDate",
max("end_time") as "maxDate",
sum("flow") as "total_flow",
count("*") as "sumed_count"
)
dataFrame4.show()
sparkSession.close()
}
2.2 代码二(SQL)
object DataFrameDemo_FlowRollUp_SQL {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.appName("DataFrameDemo_FlowRollUp_SQL")
.master("local")
.getOrCreate()
val structType: StructType = StructType(List(
StructField("uid", DataTypes.StringType, false),
StructField("start_time", DataTypes.StringType, false),
StructField("end_time", DataTypes.StringType, false),
StructField("flow", DataTypes.DoubleType, false)
))
val dataFrame: DataFrame = sparkSession.read
.schema(structType)
.csv("E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt")
.distinct()
dataFrame.createTempView("v_flow")
val dataFrame1: DataFrame = sparkSession.sql(
"""
|select
|uid,
|min(start_time) as min_date,
|max(end_time) as end_time,
|sum(flow) as total_flow
|from
|(
| select
| uid,
| start_time,
| end_time,
| flow,
| sum(flag) over (partition by uid order by start_time asc) as sum_flag
| from
| (
| select
| uid,
| start_time,
| end_time,
| flow,
| if( to_unix_timestamp(start_time, 'yyyy-MM-dd HH:mm:ss') - to_unix_timestamp(last_end_time, 'yyyy-MM-dd HH:mm:ss') > 60*10, 1, 0) as flag
| from
| (
| select
| uid,
| start_time,
| end_time,
| flow,
| lag(end_time, 1, start_time) over(partition by uid order by start_time asc) as last_end_time
| from
| (
| select
| uid,
| start_time,
| end_time,
| flow
| from
| v_flow
| )
| )
| )
|)
|group by uid, sum_flag
|""".stripMargin)
dataFrame1.show()
sparkSession.close()
}
}
2.3 总结
在企业开发中,因为sql一般都会嵌套多个子查询,所以SQL的可维护性很重要。SQL也是可以写注释的,可以的化,尽量写上注释SQL可以格式化,嵌套时,使用空格将这些sql语句尽量格式化出来代码可维护性,可读性是非常重要的,一定一定注意。
转载请注明原文地址:https://blackberry.8miu.com/read-17055.html