Spark SQL之创建dataFrame的多种方法
1.Spark SQL是什么
官网http://spark.apache.org/sql/ Spark SQL顾名思义,就是通过SQL来使用Spark强大的数据分析能力,而不用去写代码。类似一Hive,可以将框架的使用门槛极大降低。因为SQL是很多人都会使用的,而编写代码门槛相对高得多。
2. dataFrame是什么
官网介绍 和RDD类似,dataFrame也是一个分布式抽象数据容器。并不存储数据,但会存储数据来源,数据操作以及数据结构化信息schema,类似一个数据库的表。 和hive类似,也支持嵌套数据类型,struct、map、array等。对比RDD,dataframe的APi抽象层级更高,使用更加友好,门槛更低。dataFrame可以理解为RDD+schema,没错,就是mysql中经常听到的元数据,或者说结构信息。dataframe上的操作,不管是sql还是dsl的操作,最终是转换为dataframe的操作,而dataframe的操作最终是转换为针对RDD的操作,最终变成分布式物理计划执行并产生结果。跟hive类似,Spark SQL也会对逻辑计划做优化,先拆解,优化逻辑计划,优化物理计划,最后执行。Hive则是SQL优化器,解析器将sql解析为逻辑计划,执行器将逻辑计划变为物理计划。Tez则是类似Spark的思路,将mapreduce的任务组合为DAG,达到对mapreduce 的提效目的,但本质并没有spark快捷。
3. Spark SQL特性
易集成统一数据访问格式Hive集成方便标准链接,可以使用JDBC等链接 性能和可扩展性社区庞大
4. 如何创建dataFrame
环境准备
Idea 2020jdk1.8scala 2.12.12maven 3.6.3pom文件
<properties>
<maven.compiler.source>1.8
</maven.compiler.source>
<maven.compiler.target>1.8
</maven.compiler.target>
<scala.version>2.12.10
</scala.version>
<spark.version>3.0.1
</spark.version>
<hbase.version>2.2.5
</hbase.version>
<hadoop.version>3.2.1
</hadoop.version>
<encoding>UTF-8
</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang
</groupId>
<artifactId>scala-library
</artifactId>
<version>${scala.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents
</groupId>
<artifactId>httpclient
</artifactId>
<version>4.5.12
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-sql_2.12
</artifactId>
<version>${spark.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-core_2.12
</artifactId>
<version>${spark.version}
</version>
</dependency>
<dependency>
<groupId>com.alibaba
</groupId>
<artifactId>fastjson
</artifactId>
<version>1.2.73
</version>
</dependency>
<dependency>
<groupId>mysql
</groupId>
<artifactId>mysql-connector-java
</artifactId>
<version>5.1.47
</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven
</groupId>
<artifactId>scala-maven-plugin
</artifactId>
<version>3.2.2
</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins
</groupId>
<artifactId>maven-compiler-plugin
</artifactId>
<version>3.5.1
</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven
</groupId>
<artifactId>scala-maven-plugin
</artifactId>
<executions>
<execution>
<id>scala-compile-first
</id>
<phase>process-resources
</phase>
<goals>
<goal>add-source
</goal>
<goal>compile
</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile
</id>
<phase>process-test-resources
</phase>
<goals>
<goal>testCompile
</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins
</groupId>
<artifactId>maven-compiler-plugin
</artifactId>
<executions>
<execution>
<phase>compile
</phase>
<goals>
<goal>compile
</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins
</groupId>
<artifactId>maven-shade-plugin
</artifactId>
<version>2.4.3
</version>
<executions>
<execution>
<phase>package
</phase>
<goals>
<goal>shade
</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*
</artifact>
<excludes>
<exclude>META-INF/*.SF
</exclude>
<exclude>META-INF/*.DSA
</exclude>
<exclude>META-INF/*.RSA
</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
4.1 从RDD创建DataFrame
使用case class创建,DSL API
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataFrameFromRDD_CaseClass")
.master
("local[*]")
.getOrCreate
()
val sparkContext
: SparkContext
= sparkSession
.sparkContext
val rdd1
: RDD
[String] = sparkContext
.textFile
("E:\\DOITLearning\\12.Spark\\date.txt")
val mapedRDD
: RDD
[DateModel
] = rdd1
.mapPartitions
(iter
=> {
val simpleDateFormat
= new SimpleDateFormat
("yyyy-MM-dd HH:mm:ss")
iter
.map
(line
=> {
val date
: Date
= simpleDateFormat
.parse
(line
)
DateModel
(date
.getYear
, date
.getMonth
, date
.getDay
)
})
})
import sparkSession
.implicits
._
val dataFrame
: DataFrame
= mapedRDD
.toDF
()
import org
.apache
.spark
.sql
.functions
._
dataFrame
.select
("year", "day").where
($
"day" >= 5).show
()
val res2
: DataFrame
= dataFrame
.agg
(avg
("day") as
"avg_day")
res2
.show
()
sparkSession
.close
()
}
使用case class创建,SQL风格
object DataFrameFromRDD_CaseClass
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataFrameFromRDD_CaseClass")
.master
("local[*]")
.getOrCreate
()
val sparkContext
: SparkContext
= sparkSession
.sparkContext
val rdd1
: RDD
[String] = sparkContext
.textFile
("E:\\DOITLearning\\12.Spark\\date.txt")
val mapedRDD
: RDD
[DateModel
] = rdd1
.mapPartitions
(iter
=> {
val simpleDateFormat
= new SimpleDateFormat
("yyyy-MM-dd HH:mm:ss")
iter
.map
(line
=> {
val date
: Date
= simpleDateFormat
.parse
(line
)
DateModel
(date
.getYear
, date
.getMonth
, date
.getDay
)
})
})
import sparkSession
.implicits
._
val dataFrame
: DataFrame
= mapedRDD
.toDF
()
dataFrame
.createTempView
("v_temp_dateinfo")
dataFrame
.createGlobalTempView
("v_global_dateinfo")
dataFrame
.printSchema
()
dataFrame
.show
( 5)
sparkSession
.close
()
}
}
case class DateModel
(year
: Int, month
: Int, day
: Int)
结合StructType创建
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataframeFromRDD_StructType")
.master
("local")
.getOrCreate
()
val sparkContext
: SparkContext
= sparkSession
.sparkContext
val rdd1
: RDD
[String] = sparkContext
.textFile
("E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt")
val mapRDD
: RDD
[Row
] = rdd1
.map
(line
=> {
val strings
: Array
[String] = line
.split
(",")
var row
: Row
= null
try {
val uid
: String = strings
(0)
val startDate
: String = strings
(1)
val endDate
: String = strings
(2)
val netFlow
: Double = strings
(3).toDouble
row
= Row
(uid
, startDate
, endDate
, netFlow
)
} catch {
case e
:Exception
=>{
e
.printStackTrace
()
}
}
row
}).filter
(ele
=> ele
!= null)
val structType
: StructType
= StructType
(List
(
StructField
("uid", DataTypes
.StringType
),
StructField
("startDate", DataTypes
.StringType
, false),
StructField
("endDate", DataTypes
.StringType
, false),
StructField
("netFlow", DataTypes
.DoubleType
, false),
))
val dataFrame
: DataFrame
= sparkSession
.createDataFrame
(mapRDD
, structType
)
dataFrame
.printSchema
()
dataFrame
.show
()
Thread
.sleep
(2000000000)
sparkSession
.close
()
}
结合tuple创建
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataframeFromRDD_Tuple")
.master
("local")
.getOrCreate
()
val sparkContext
: SparkContext
= sparkSession
.sparkContext
val rdd1
: RDD
[String] = sparkContext
.textFile
("E:\\DOITLearning\\12.Spark\\netflowRollupSourceData.txt")
val mapRDD
: RDD
[(String, String, String, Double)] = rdd1
.map
(line
=> {
val strings
: Array
[String] = line
.split
(",")
val uid
: String = strings
(0)
val startDate
: String = strings
(1)
val endDate
: String = strings
(2)
val netFlow
: Double = strings
(3).toDouble
(uid
, startDate
, endDate
, netFlow
)
})
import sparkSession
.implicits
._
val dataFrame
: DataFrame
= mapRDD
.toDF
()
dataFrame
.printSchema
()
dataFrame
.show
()
sparkSession
.close
()
}
4.2 从结构化文件创建DataFrame
从带表头的CSV文件创建\
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataFrameFromFile_CSV_Titled")
.master
("local")
.getOrCreate
()
val structType
: StructType
= StructType
(List
(
StructField
("color", DataTypes
.StringType
, false),
StructField
("price", DataTypes
.DoubleType
, false),
StructField
("brand", DataTypes
.StringType
, false),
StructField
("model", DataTypes
.StringType
, false)
))
val dataFrame
: DataFrame
= sparkSession
.read
.option
("delimiter", ",")
.option
("header", true)
.schema
(structType
)
.csv
("E:\\DOITLearning\\12.Spark\\title_csv_src.csv")
dataFrame
.show
()
dataFrame
.write
.parquet
("E:\\DOITLearning\\12.Spark\\parquet_src")
sparkSession
.close
()
}
从不带表头的CSV文件创建1
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataFrameFromFile_CSV_Untitled")
.master
("local")
.getOrCreate
()
val dataFrame
: DataFrame
= sparkSession
.read
.option
("delimiter", " ").csv
("E:\\DOITLearning\\12.Spark\\wc.txt")
dataFrame
.toDF
("word1", "word2", "word3", "word4", "word5", "word6", "word7", "word8")
val structType
: StructType
= StructType
(List
(
StructField
("word1", DataTypes
.StringType
, false),
StructField
("word2", DataTypes
.StringType
, false),
StructField
("word3", DataTypes
.StringType
, false),
StructField
("word4", DataTypes
.StringType
, false),
StructField
("word5", DataTypes
.StringType
, false),
StructField
("word6", DataTypes
.StringType
, true),
StructField
("word7", DataTypes
.StringType
, true),
StructField
("word8", DataTypes
.StringType
, true),
))
sparkSession
.read
.schema
(structType
).csv
()
dataFrame
.show
()
}
从不带表头的CSV文件创建2
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataFrameFromFile_CSV_Untitled")
.master
("local")
.getOrCreate
()
val structType
: StructType
= StructType
(List
(
StructField
("word1", DataTypes
.StringType
, false),
StructField
("word2", DataTypes
.StringType
, false),
StructField
("word3", DataTypes
.StringType
, false),
StructField
("word4", DataTypes
.StringType
, false),
StructField
("word5", DataTypes
.StringType
, false),
StructField
("word6", DataTypes
.StringType
, true),
StructField
("word7", DataTypes
.StringType
, true),
StructField
("word8", DataTypes
.StringType
, true),
))
val dataFrame
: DataFrame
= sparkSession
.read
.option
("delimiter", " ")
.schema
(structType
)
.csv
("E:\\DOITLearning\\12.Spark\\wc.txt")
dataFrame
.show
()
sparkSession
.close
()
}
从json文件创建
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataFrameFromFile_JSON")
.master
("local")
.getOrCreate
()
val dataFrame
: DataFrame
= sparkSession
.read
.json
("E:\\DOITLearning\\12.Spark\\json_test_src2.json")
dataFrame
.show
()
dataFrame
.createTempView
("v_test")
dataFrame
.select
("email", "firstName").where
("firstname='Brett'").show
()
sparkSession
.close
()
}
从parquet文件创建
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataFrameFromFile_Parquet")
.master
("local")
.getOrCreate
()
val dataFrame
: DataFrame
= sparkSession
.read
.parquet
("E:\\DOITLearning\\12.Spark\\parquet_src")
dataFrame
.printSchema
()
dataFrame
.show
()
dataFrame
.select
("price").where
("price > 300").show
()
sparkSession
.close
()
}
注意,这里比较特殊的是parquet格式文件,因为这个格式文件中自带索引和schema信息,因此不需要额外指定schema,同时自带压缩,列式存储等特性,是spark最推荐的一种文件格式
4.3 从外部服务器读取数据创建DataFrame
从JDBC创建
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
()
.appName
("DataFrameFrom_JDBC")
.master
("local")
.getOrCreate
()
val properties
= new Properties
()
properties
.setProperty
("user", "root")
properties
.setProperty
("password", "123456")
val dataFrame
: DataFrame
= sparkSession
.read
.jdbc
("jdbc:mysql://localhost:3306/db_demo1?characterEncoding=utf-8", "tb_user", properties
)
dataFrame
.show
()
sparkSession
.close
()
}
4.4 总结
dataframe可以看做是RDD+schema,所以只要有数据,有对应结构化信息,就可以转换为dataFramedataFrame可以看作是特殊化的dataset,因为包含了结构化信息大数据处理中,一般会想办法将非结构化和板结构化数据转换为结构化数据再处理。因为
结构化数据处理可以使用SQL,使用门槛和难度较低结构化数据处理技术相对最成熟,可以很好利用已有的技术优化和实现思路甚至直接使用现有的第三方库结构化数据也更符合人的思维模式,开发和使用更便利