什么时候会用到 Parquet ?
在 ETL 中, Spark 经常扮演 T 的职务, 也就是进行数据清洗和数据转换.为了能够保存比较复杂的数据, 并且保证性能和压缩率, 通常使用 Parquet 是一个比较不错的选择.所以外部系统收集过来的数据, 有可能会使用 Parquet, 而 Spark 进行读取和转换的时候, 就需要支持对 Parquet 格式的文件的支持.使用代码读写 Parquet 文件 默认不指定 format 的时候, 默认就是读写 Parquet 格式的文件
import org.apache.spark.sql.{DataFrame, SparkSession} /** * DESC: */ object SparkToParquet { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("hello") .master("local[6]") .getOrCreate() val df = spark.read .option("header", value = true) .csv("./datasets/input/911.csv") // 保存 Parquet 文件 df.write.mode("overwrite").save("./datasets/output/911.parquet") // 读取 Parquet 文件val dfFromParquet = spark.read.parquet("dataset/911.parquet") val dfSchema: DataFrame = spark.read.parquet("./datasets/output/911.parquet") dfSchema.createOrReplaceTempView("test911") spark.sql("select * from test911 where zip > 19000 and zip < 19400").show() } }写入 Parquet 的时候可以指定分区 Spark 在写入文件的时候是支持分区的, 可以像 Hive 一样设置某个列为分区列
val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate() // 从 CSV 中读取内容val dfFromParquet = spark.read.option("header", value = true).csv("dataset/BeijingPM20100101_20151231.csv") // 保存为 Parquet 格式文件, 不指定 format 默认就是 Parquet dfFromParquet.write.partitionBy("year", "month").save("dataset/beijing_pm")注意:这个地方指的分区是类似 Hive 中表分区的概念, 而不是 RDD 分布式分区的含义。
分区发现 在读取常见文件格式的时候, Spark 会自动的进行分区发现, 分区自动发现的时候, 会将文件名中的分区信息当作一列. 例如 如果按照性别分区, 那么一般会生成两个文件夹 gender=male 和 gender=female, 那么在使用 Spark 读取的时候, 会自动发现这个分区信息, 并且当作列放入创建的 DataFrame 中 使用代码证明这件事可以有两个步骤, 第一步先读取某个分区的单独一个文件并打印其 Schema 信息, 第二步读取整个数据集所有分区并打印 Schema 信息, 和第一步做比较就可以确定
val spark = ... val partDF = spark.read.load("dataset/beijing_pm/year=2010/month=1") partDF.printSchema()把分区的数据集中的某一个区单做一整个数据集读取, 没有分区信息, 自然也不会进行分区发现
val df = spark.read.load("dataset/beijing_pm") df.printSchema()此处读取的是整个数据集, 会进行分区发现, DataFrame 中会包含分去列 Table 1. SparkSession 中有关 Parquet 的配置
配置默认值含义spark.sql.parquet.binaryAsStringfalse一些其他 Parquet 生产系统, 不区分字符串类型和二进制类型, 该配置告诉 SparkSQL 将二进制数据解释为字符串以提供与这些系统的兼容性spark.sql.parquet.int96AsTimestamptrue一些其他 Parquet 生产系统, 将 Timestamp 存为 INT96, 该配置告诉 SparkSQL 将 INT96 解析为 Timestampspark.sql.parquet.cacheMetadatatrue打开 Parquet 元数据的缓存, 可以加快查询静态数据spark.sql.parquet.compression.codecsnappy压缩方式, 可选 uncompressed, snappy, gzip, lzospark.sql.parquet.mergeSchemafalse当为 true 时, Parquet 数据源会合并从所有数据文件收集的 Schemas 和数据, 因为这个操作开销比较大, 所以默认关闭spark.sql.optimizer.metadataOnlytrue如果为 true, 会通过原信息来生成分区列, 如果为 false 则就是通过扫描整个数据集来确定