1.通过RDD+case class创建DataFrame
package com
.doit
.spark
.day10
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.sql
.{DataFrame
, SparkSession
}
object CaseClassCreateDataFrame
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
().appName
(this.getClass
().getSimpleName
).master
("local[*]").getOrCreate
()
val lineRDD
: RDD
[String] = sparkSession
.sparkContext
.textFile
("C:\\Users\\WoBo\\Desktop\\user.txt")
val userRDD
: RDD
[User
] = lineRDD
.map
(x
=> {
val arr
: Array
[String] = x
.split
(",")
val name
: String = arr
(0)
val age
: Int = arr
(1).toInt
val fv
: Double = arr
(2).toDouble
User
(name
, age
, fv
)
})
val dataFrame
: DataFrame
= sparkSession
.createDataFrame
(userRDD
)
dataFrame
.printSchema
()
dataFrame
.show
()
}
}
case class User
(name
:String,age
:Int,fv
:Double)
2.通过RowRDD+StructType创建DataFrame
package com
.doit
.spark
.day10
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.sql
.types
.{DataTypes
, StructField
, StructType
}
import org
.apache
.spark
.sql
.{DataFrame
, Row
, SparkSession
}
object DataStructTypeCreateDataFrame
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
().appName
(this.getClass
().getSimpleName
).master
("local[*]").getOrCreate
()
val lineRDD
: RDD
[String] = sparkSession
.sparkContext
.textFile
("C:\\Users\\WoBo\\Desktop\\user.txt")
val rowRDD
: RDD
[Row
] = lineRDD
.map
(x
=> {
val arr
: Array
[String] = x
.split
(",")
val name
: String = arr
(0)
val age
: Int = arr
(1).toInt
val fv
: Double = arr
(2).toDouble
Row
(name
, age
, fv
)
})
val structType
= new StructType
()
.add
("name",DataTypes
.StringType
,false)
.add
("age",DataTypes
.IntegerType
,true)
.add
("fv",DataTypes
.DoubleType
,true)
val dataFrame
: DataFrame
= sparkSession
.createDataFrame
(rowRDD
, structType
)
dataFrame
.printSchema
()
dataFrame
.show
()
}
}
3.RDD+toDF创建DataFrame
package com
.doit
.spark
.day10
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.sql
.{DataFrame
, Row
, SparkSession
}
object RDDtoDFCreateDataFrame
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
().appName
(this.getClass
().getSimpleName
).master
("local[*]").getOrCreate
()
val lineRDD
: RDD
[String] = sparkSession
.sparkContext
.textFile
("C:\\Users\\WoBo\\Desktop\\user.txt")
val tupleRDD
: RDD
[(String, Int, Double)] = lineRDD
.map
(x
=> {
val arr
: Array
[String] = x
.split
(",")
val name
: String = arr
(0)
val age
: Int = arr
(1).toInt
val fv
: Double = arr
(2).toDouble
(name
, age
, fv
)
})
import sparkSession
.implicits
._
val dataFrame
: DataFrame
= tupleRDD
.toDF
("name", "age", "fv")
dataFrame
.printSchema
()
dataFrame
.show
()
}
}
4.读取JSON格式的数据,生成DataFrame
package com
.doit
.spark
.day10
import org
.apache
.spark
.sql
.{DataFrame
, SparkSession
}
object ReadJSONCreateDataFrame
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
().appName
(this.getClass
().getSimpleName
).master
("local[*]").getOrCreate
()
val dataFrame
: DataFrame
= sparkSession
.read
.json
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.json")
val dataFrame1
: DataFrame
= sparkSession
.read
.format
("json").load
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.json")
dataFrame1
.createTempView
("v_user")
dataFrame1
.printSchema
()
sparkSession
.sql
("select * from v_user where _corrupt_record is NOT NULL").show
()
}
}
在读取json格式的数据时,在处理数据之前,就触发一次Action,并进行全表检索,目的是生成表的schema信息,但json类型的数据,每一行的字段个数可能不一样,所以需要全表检索,最终将出现过的字段都会生成表的字段,没有该字段的行赋予null,读取json类型的数据时,还会自动推导数据的类型. 如果json数据中存在脏数据,会专门生成一个字段(_corrupt_record),将所有的脏数据都放入该字段中.
+
| _corrupt_record
| age
| fv
|gender
|height
| name
|
+
| null| 18|9999.99| null| null| laozhao
|
| null| 28| 999.99| null| null| laoduan
|
| null| 20| 999.99| null| null|nianhang
|
|{
"name":
"heihei"...|null| null| null| null| null|
| null| 20| 999.99| male
| null| nana
|
| null|null| null| null| 180.2| test
|
| null| 18|9999.99| null| null| laozhao
|
| null| 28| 999.99| null| null| laoduan
|
| null| 20| 999.99| null| null|nianhang
|
root
|
|
|
|
|
|
+
| _corrupt_record
| age
| fv
|gender
|height
|name
|
+
|{
"name":
"heihei"...|null|null| null| null|null|
|{
"name":
"heihei"...|null|null| null| null|null|
|{
"name":
"heihei"...|null|null| null| null|null|
|{
"name":
"heihei"...|null|null| null| null|null|
|{
"name":
"heihei"...|null|null| null| null|null|
|{
"name":
"heihei"...|null|null| null| null|null|
+
5.读取CSV格式的数据,生成DataFrame
package com
.doit
.spark
.day10
import org
.apache
.spark
.sql
.{DataFrame
, SparkSession
}
object ReadCSVCreateDataFrame
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
().appName
(this.getClass
().getSimpleName
).master
("local[*]").getOrCreate
()
val dataFrame
: DataFrame
= sparkSession
.read
.option
("inferSchema", "true")
.csv
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.csv")
.toDF
("name", "age", "fv")
val dataFrame1
: DataFrame
= sparkSession
.read
.option
("header", "true")
.option
("inferSchema", "true")
.csv
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.csv")
dataFrame
.printSchema
()
dataFrame
.show
()
}
}
读取csv文件时,读取表头会触发一次Action,如果自动推导数据类型还会再触发一次Action,而且会全表检索,因为它需要判断后面的每行数据的每个字段是否都是同一数据类型的,那么这样自动推导数据类型就会十分浪费运行速度,所以最佳的方式为手动指定StructType
package com
.doit
.spark
.day10
import org
.apache
.spark
.sql
.types
.{DataTypes
, StructType
}
import org
.apache
.spark
.sql
.{DataFrame
, SparkSession
}
object ReadCSVCreateDataFrame
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
().appName
(this.getClass
().getSimpleName
).master
("local[*]").getOrCreate
()
val structType
= new StructType
()
.add
("name",DataTypes
.StringType
,false)
.add
("age",DataTypes
.DoubleType
,true)
.add
("fv",DataTypes
.DoubleType
,true)
val dataFrame2
: DataFrame
= sparkSession
.read
.option
("header", "true")
.schema
(structType
)
.csv
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.csv")
dataFrame2
.printSchema
()
dataFrame2
.show
()
}
}
当数据有表头时,可以使用.option(“header”, “true”),将表头读取掉
csv数据中,当某行多一个字段时,会自动忽略该行,少字段时,会补null
6.读取parquet格式的数据,生成DataFrame
首先parquet格式的数据它是一种列式存储文件,它比普通格式的文件,更加紧凑,且支持压缩,当我们写sql进行数据查询时,查询某个字段不用全表检索,大大提高了检索速度,且它自带schema信息(包括字段名称,字段类型,字段索引等信息),parquet格式的数据是spark最喜欢的文件格式
但parquet文件不能手动创建,只能系统生成:
package com
.doit
.spark
.day10
import org
.apache
.spark
.sql
.{DataFrame
, SaveMode
, SparkSession
}
object CreateParquetDataFrame
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
().appName
(this.getClass
().getSimpleName
).master
("local[*]").getOrCreate
()
val dataFrame
: DataFrame
= sparkSession
.read
.json
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.json")
dataFrame
.write
.parquet
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.parquet")
dataFrame
.write
.mode
(SaveMode
.Append
).parquet
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.parquet")
dataFrame
.write
.mode
(SaveMode
.Overwrite
)
dataFrame
.write
.mode
(SaveMode
.Ignore
)
val dataFrame1
: DataFrame
= sparkSession
.read
.parquet
("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.parquet")
dataFrame1
.printSchema
()
dataFrame1
.show
()
}
}
7.读写JDBC中的数据
package com
.doit
.spark
.day10
import java
.util
.Properties
import org
.apache
.spark
.sql
.{DataFrame
, SaveMode
, SparkSession
}
object JDBCAndDataFrame
{
def main
(args
: Array
[String]): Unit = {
val sparkSession
: SparkSession
= SparkSession
.builder
().appName
(this.getClass
().getSimpleName
).master
("local[*]").getOrCreate
()
val properties
= new Properties
()
properties
.setProperty
("user","root")
properties
.setProperty
("password","123456")
val dataFrame
: DataFrame
= sparkSession
.read
.jdbc
("jdbc:mysql://localhost:3306/db_demo3?characterEncoding=UTF-8", "student", properties
)
dataFrame
.printSchema
()
dataFrame
.show
()
dataFrame
.write
.mode
(SaveMode
.Append
)
.jdbc
("jdbc:mysql://localhost:3306/db_demo3?characterEncoding=UTF-8", "student", properties
)
}
}