Spark SQL之dataframe数据保存

    科技2022-08-17  114

    Spark SQL之dataframe数据保存

    1. 背景

    Spark SQL作为处理结构化数据的功能模块,本身支持SQL形式使用功能,内部也做了相对RDD更加高的抽象DataFrame也是一个抽象数据集合,但对比RDD多了schema数据结构化信息,可以将DataFrame看成是RDD+schema信息

    2. dataframe数据保存类型

    环境准备

    Idea2020jdk 1.8scala 2.12.12maven 3.6.3pom文件 <!-- 定义了一些常量 --> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.12.10</scala.version> <spark.version>3.0.1</spark.version> <hbase.version>2.2.5</hbase.version> <hadoop.version>3.2.1</hadoop.version> <encoding>UTF-8</encoding> </properties> <dependencies> <!-- 导入scala的依赖 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <!-- 编译时会引入依赖,打包是不引入依赖 --> <!-- <scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <!-- 编译时会引入依赖,打包是不引入依赖 --> <!-- <scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <!-- 编译scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <!-- 编译java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build>

    2.1 JDBC保存到mysql

    def main(args: Array[String]): Unit = { // 先读取数据,获取datafrmae val sparkSession: SparkSession = SparkSession.builder() .master("local") .appName("DataFrame_SaveToMySQL") .getOrCreate() // 将结果通过jdbc的方式写入到mysql val sparkContext: SparkContext = sparkSession.sparkContext val rdd1: RDD[(String, (String, Int))] = sparkContext.makeRDD(List(("name", ("xixi", 10)), ("name", ("haha", 20)), ("name", ("licha", 30)))) // 先创建RDD,注意需要转换为Row的最小单元 val rdd2: RDD[Row] = rdd1.map(ele => { Row(ele._2._1, ele._2._2) }) // 这是schema信息对象 val structType: StructType = StructType(List( StructField("name", DataTypes.StringType, false), StructField("age", DataTypes.IntegerType, false) )) // 结合RDD,schema信息,创建dataframe val dataFrame: DataFrame = sparkSession.createDataFrame(rdd2, structType) dataFrame.show() // 往mysql中写入数据,注意如果没有表,可以使用jdbc参数,createtableifnotexists来指定,但这样以来就会没有主键 // 最好还是先创建好表格之后再写入数据,这样可以有主键,也更符合实际生产中流程和要求 dataFrame.createTempView("v_user") val dataFrame1: DataFrame = sparkSession.sql("select name, age from v_user where age > 5") // 注意实际生产中,一般不会直接使用root账号和密码,一般都是更低权限的用户,可以读写,但无法删除数据 val properties = new Properties() properties.setProperty("user", "root") properties.setProperty("password", "123456") dataFrame1 .write .mode(SaveMode.Append) .jdbc("jdbc:mysql://localhost:3306/db_demo1?createTableIfNotExists=true&characterEncoding=utf-8", "tb_user_dataframe2", properties) sparkSession.close() }

    2.2 保存到parquet文件(还可以是json、csv等支持的文件格式)

    def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder() .appName("Dataframe_SaveToParquet") .master("local") .getOrCreate() val structType: StructType = StructType(List( StructField("start_ip", DataTypes.StringType, false), StructField("end_ip", DataTypes.StringType, false), StructField("start_ip_num", DataTypes.LongType, false), StructField("end_ip_num", DataTypes.LongType, false), StructField("continent", DataTypes.StringType, false), StructField("country", DataTypes.StringType, false), StructField("province", DataTypes.StringType, true), StructField("city", DataTypes.StringType, true), StructField("district", DataTypes.StringType, true), StructField("operator", DataTypes.StringType, true), StructField("operator_code", DataTypes.IntegerType, true), StructField("country_en", DataTypes.StringType, true), StructField("country_code", DataTypes.StringType, true), StructField("longitude", DataTypes.DoubleType, true), StructField("latitude", DataTypes.DoubleType, true) )) // 1.2.2.0|1.2.2.255|16908800|16909055|亚洲|中国|北京|北京|海淀|北龙中网|110108|China|CN|116.29812|39.95931 // E:\DOITLearning\12.Spark\ip_location_dict.txt val dataFrame: DataFrame = sparkSession .read .option("delimiter", "|") .schema(structType) .csv("E:\\DOITLearning\\12.Spark\\ip_location_dict.txt") // 可以对数据做重新分区 dataFrame.repartition(2) .write .mode(SaveMode.Append) .parquet("E:\\DOITLearning\\12.Spark\\parquet_out2") sparkSession.close() }

    2.3 总结

    注意,DataFrame是一个抽象数据集合,类似于数据库中的表table,因为数据变成结构化之后,更加有利于查询和分析操作当使用dataframe做数据分析操作处理之后,结果需要保存。结果保存大致可以分为两类,一种是保存为文件,因为spark本身支持多种文件格式,可以直接保存为对应格式,并且支持本地文件和网络文件系统;一种是保存到各个数据库中,这个可以通过JDBC或ODBC的方式来保存,这样就不局限于MySQL一种数据库。当dataFrame的数据计算出来之后,可以作为下一个操作的输入数据源,也可以是最终结果最后保存。
    Processed: 0.008, SQL: 9