sparksql简介sparksql中DataFrame和DataSet的数据结构sparksql中DataFrame和DataSet的使用方式
1.sparksql概述
1.1 sparksql的前世今生
Shark是专门针对于spark的构建大规模数据仓库系统的一个框架Shark与Hive兼容、同时也依赖于Spark版本Hivesql底层把sql解析成了mapreduce程序,Shark是把sql语句解析成了Spark任务随着性能优化的上限,以及集成SQL的一些复杂的分析功能,发现Hive的MapReduce思想限制了Shark的发展。最后Databricks公司终止对Shark的开发
决定单独开发一个框架,不在依赖hive,把重点转移到了sparksql这个框架上。
1.2 什么是sparksql
Spark SQL is Apache Spark’s module for working with structured data.SparkSQL是apache Spark用来处理结构化数据的一个模块
2. sparksql的四大特性
1、易整合
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X1pSAq4V-1602159306282)(spark_day05课程设计.assets/)]
将SQL查询与Spark程序无缝混合可以使用不同的语言进行代码开发
javascalapythonR
2、统一的数据源访问
以相同的方式连接到任何数据源
sparksql后期可以采用一种统一的方式去对接任意的外部数据源
SparkSession
.read
.该数据类型的方法名
(该格式数据的路径
)
3、兼容hive
sparksql可以支持hivesql这种语法 sparksql兼容hivesql
4、支持标准的数据库连接
sparksql支持标准的数据库连接JDBC或者ODBC
3. DataFrame概述
3.1 DataFrame发展
DataFrame前身是schemaRDD,这个schemaRDD是直接继承自RDD,它是RDD的一个实现类在spark1.3.0之后把schemaRDD改名为DataFrame,它不在继承自RDD,而是自己实现RDD上的一些功能也可以把dataFrame转换成一个rdd,调用rdd这个方法
例如 val rdd1=dataFrame.rdd
3.2 DataFrame是什么
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化DataFrame可以从很多数据源构建
比如:已经存在的RDD、结构化文件、外部数据库、Hive表。
DataFrame = RDD + schema元信息(对数据的结构描述信息)
DataFrame可以看成是一张mysql表。
表中有数据,同时表中还有字段的名称和类型,这里的字段的名称和类型就可以理解成Schema信息
3.3 DataFrame和RDD的优缺点
1、RDD
优点
1、编译时类型安全
开发会进行类型检查,在编译的时候及时发现错误 2、具有面向对象编程的风格
缺点
1、构建大量的java对象占用了大量heap堆空间,导致频繁的GC
由于数据集RDD它的数据量比较大,后期都需要存储在heap堆中,这里有heap堆中的内存空间有限,出现频繁的垃圾回收(GC),程序在进行垃圾回收的过程中,所有的任务都是暂停。影响程序执行的效率
2、数据的序列化和反序列性能开销很大 在分布式程序中,对象(对象的内容和结构)是先进行序列化,发送到其他服务器,进行大量的网络传输,然后接受到这些序列化的数据之后,再进行反序列化来恢复该对象
2、DataFrame
DataFrame引入了schema元信息和off-heap(堆外)优点
1、DataFrame引入off-heap,大量的对象构建直接使用操作系统层面上的内存,不在使用heap堆中的内存,这样一来heap堆中的内存空间就比较充足,不会导致频繁GC,程序的运行效率比较高,它是解决了RDD构建大量的java对象占用了大量heap堆空间,导致频繁的GC这个缺点。
2、DataFrame引入了schema元信息—就是数据结构的描述信息,后期spark程序中的大量对象在进行网络传输的时候,只需要把数据的内容本身进行序列化就可以,数据结构信息可以省略掉。这样一来数据网络传输的数据量是有所减少,数据的序列化和反序列性能开销就不是很大了。它是解决了RDD数据的序列化和反序列性能开销很大这个缺点
缺点
DataFrame引入了schema元信息和off-heap(堆外)它是分别解决了RDD的缺点,同时它也丢失了RDD的优点
1、编译时类型不安全
编译时不会进行类型的检查,这里也就意味着前期是无法在编译的时候发现错误,只有在运行的时候才会发现 2、不在具有面向对象编程的风格
4. 读取文件构建DataFrame
4.1 读取文本文件创建DataFrame
第一种方式
val rdd1
=sc
.textFile
("/person.txt").map
(x
=>x
.split
(" "))
case class Person
(id
:String,name
:String,age
:Int)
val personRDD
=rdd1
.map
(x
=>Person
(x
(0),x
(1),x
(2).toInt
))
val personDF
=personRDD
.toDF
personDF
.printSchema
personDF
.show
第二种方式
val personDF
=spark
.read
.text
("/person.txt")
personDF
.printSchema
personDF
.show
4.2 读取json文件创建DataFrame
val peopleDF
=spark
.read
.json
("/people.json")
peopleDF
.printSchema
peopleDF
.show
4.3 读取parquet文件创建DataFrame
val usersDF
=spark
.read
.parquet
("/users.parquet")
usersDF
.printSchema
usersDF
.show
5. DataFrame常用操作
5.1 DSL风格语法
就是sparksql中的DataFrame自身提供了一套自己的Api,可以去使用这套api来做相应的处理
val rdd1
=sc
.textFile
("/person.txt").map
(x
=>x
.split
(" "))
case class Person
(id
:String,name
:String,age
:Int)
val personRDD
=rdd1
.map
(x
=>Person
(x
(0),x
(1),x
(2).toInt
))
val personDF
=personRDD
.toDF
personDF
.printSchema
personDF
.show
personDF
.select
("name").show
personDF
.select
($
"name").show
personDF
.select
(col
("name").show
personDF
.select
($
"name",$
"age",$
"age"+1).show
personDF
.filter
($
"age" > 30).show
personDF
.groupBy
("age").count
.show
personDF
.groupBy
("age").count
().sort
($
"count".desc
)show
5.2 SQL风格语法
可以把DataFrame注册成一张表,然后通过sparkSession.sql(sql语句)操作
personDF
.createTempView
("person")
spark
.sql
("select * from person").show
spark
.sql
("select name from person").show
spark
.sql
("select name,age from person").show
spark
.sql
("select * from person where age >30").show
spark
.sql
("select count(*) from person where age >30").show
spark
.sql
("select age,count(*) from person group by age").show
spark
.sql
("select age,count(*) as count from person group by age").show
spark
.sql
("select * from person order by age desc").show
6. DataSet概述
6.1 DataSet是什么
DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及使用了Spark SQL优化的执行引擎。
6.2 RDD、DataFrame、DataSet的区别
假设RDD中的两行数据长这样
那么DataFrame中的数据长这样
Dataset中的数据长这样
或者长这样(每行数据是个Object)
DataSet包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。
(1)DataSet可以在编译时检查类型
(2)并且是面向对象的编程接口
6.3 DataFrame与DataSet互相转换
1、把一个DataFrame转换成DataSet
val dataSet=dataFrame.as[强类型]
2、把一个DataSet转换成DataFrame
val dataFrame=dataSet.toDF
补充说明
可以从dataFrame和dataSet获取得到rdd
val rdd1=dataFrame.rddval rdd2=dataSet.rdd
6.4 构建DataSet
1、 通过sparkSession调用createDataset方法
val ds
=spark
.createDataset
(1 to
10)
val ds
=spark
.createDataset
(sc
.textFile
("/person.txt"))
2、使用scala集合和rdd调用toDS方法
sc
.textFile
("/person.txt").toDS
List
(1,2,3,4,5).toDS
3、把一个DataFrame转换成DataSet
val dataSet=dataFrame.as[强类型]
4、通过一个DataSet转换生成一个新的DataSet
List
(1,2,3,4,5).toDS
.map
(x
=>x
*10)
7. 通过IDEA开发程序实现把RDD转换DataFrame
添加依赖
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-sql_2.11
</artifactId>
<version>2.3.3
</version>
</dependency>
7.1 利用反射机制
定义一个样例类,后期直接映射成DataFrame的schema信息代码开发
package cn
.linann
.sql
import org
.apache
.spark
.SparkContext
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.sql
.{Column
, DataFrame
, Row
, SparkSession
}
case class Person
(id
:String,name
:String,age
:Int)
object CaseClassSchema
{
def main
(args
: Array
[String]): Unit = {
val spark
: SparkSession
= SparkSession
.builder
().appName
("CaseClassSchema").master
("local[2]").getOrCreate
()
val sc
: SparkContext
= spark
.sparkContext
sc
.setLogLevel
("warn")
val data
: RDD
[Array
[String]] = sc
.textFile
("E:\\person.txt").map
(x
=>x
.split
(" "))
val personRDD
: RDD
[Person
] = data
.map
(x
=>Person
(x
(0),x
(1),x
(2).toInt
))
import spark
.implicits
._
val personDF
: DataFrame
= personRDD
.toDF
personDF
.printSchema
()
personDF
.show
()
val first
: Row
= personDF
.first
()
println
("first:"+first
)
val top3
: Array
[Row
] = personDF
.head
(3)
top3
.foreach
(println
)
personDF
.select
("name").show
()
personDF
.select
($
"name").show
()
personDF
.select
(new Column
("name")).show
()
personDF
.select
("name","age").show
()
personDF
.select
($
"name",$
"age",$
"age"+1).show
()
personDF
.filter
($
"age" >30).show
()
val count
: Long = personDF
.filter
($
"age" >30).count
()
println
("count:"+count
)
personDF
.groupBy
("age").count
().show
()
personDF
.show
()
personDF
.foreach
(row
=> println
(row
))
personDF
.foreach
(row
=>println
(row
.getAs
[String]("name")))
personDF
.foreach
(row
=>println
(row
.get
(1)))
personDF
.foreach
(row
=>println
(row
.getString
(1)))
personDF
.foreach
(row
=>println
(row
.getAs
[String](1)))
personDF
.createTempView
("person")
spark
.sql
("select * from person").show
spark
.sql
("select name from person").show
spark
.sql
("select name,age from person").show
spark
.sql
("select * from person where age >30").show
spark
.sql
("select count(*) from person where age >30").show
spark
.sql
("select age,count(*) from person group by age").show
spark
.sql
("select age,count(*) as count from person group by age").show
spark
.sql
("select * from person order by age desc").show
spark
.stop
()
}
}
7.2 通过StructType直接指定Schema
代码开发
package cn
.linann
.sql
import org
.apache
.spark
.SparkContext
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.sql
.types
.{IntegerType
, StringType
, StructField
, StructType
}
import org
.apache
.spark
.sql
.{DataFrame
, Row
, SparkSession
}
object StructTypeSchema
{
def main
(args
: Array
[String]): Unit = {
val spark
: SparkSession
= SparkSession
.builder
().appName
("StructTypeSchema").master
("local[2]").getOrCreate
()
val sc
: SparkContext
= spark
.sparkContext
sc
.setLogLevel
("warn")
val data
: RDD
[Array
[String]] = sc
.textFile
("E:\\person.txt").map
(x
=>x
.split
(" "))
val rowRDD
: RDD
[Row
] = data
.map
(x
=>Row
(x
(0),x
(1),x
(2).toInt
))
val schema
=StructType
(
StructField
("id",StringType
)::
StructField
("name",StringType
)::
StructField
("age",IntegerType
)::Nil
)
val dataFrame
: DataFrame
= spark
.createDataFrame
(rowRDD
,schema
)
dataFrame
.printSchema
()
dataFrame
.show
()
dataFrame
.createTempView
("user")
spark
.sql
("select * from user").show
()
spark
.stop
()
}
}
8、sparksql 操作hivesql
添加依赖
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-hive_2.11
</artifactId>
<version>2.3.3
</version>
</dependency>
代码开发
package cn
.linann
.sql
import org
.apache
.spark
.sql
.SparkSession
object HiveSupport
{
def main
(args
: Array
[String]): Unit = {
val spark
: SparkSession
= SparkSession
.builder
()
.appName
("HiveSupport")
.master
("local[2]")
.enableHiveSupport
()
.getOrCreate
()
spark
.sql
("create table people(id string,name string,age int) row format delimited fields terminated by ','")
spark
.sql
("load data local inpath './data/11.txt' into table people ")
spark
.sql
("select * from people").show
()
spark
.stop
()
}
}