Spark(26) -- SparkSQL整合hive以及sparkSQL使用

    科技2023-12-14  68

    SparkSQL官方文档(3.0.1): sql-programming-guide.html sql-data-sources-hive-tables

     Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如文本、Hive、Json等)。Spark SQL的其中一个分支就是Spark on Hive,也就是使用Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业。SparkSql整合hive就是获取hive表中的元数据信息,然后通过SparkSql来操作数据

    1. sparkSQL整合Hive

    第一步:将hive-site.xml拷贝到spark安装路径conf目录 node03执行以下命令来拷贝hive-site.xml到所有的spark安装服务器上面去

    cd /export/servers/hive-1.1.0-cdh5.14.0/conf cp hive-site.xml /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/ scp hive-site.xml node03:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/ scp hive-site.xml node02:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/

    第二步:将mysql的连接驱动包拷贝到spark的jars目录下 node03执行以下命令将连接驱动包拷贝到spark的jars目录下,三台机器都要进行拷贝

    cd /export/servers/hive-1.1.0-cdh5.14.0/lib cp mysql-connector-java-5.1.38.jar /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/ scp mysql-connector-java-5.1.38.jar node02:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/ scp mysql-connector-java-5.1.38.jar node03:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/

    第三步:测试sparksql整合hive是否成功 先启动hadoop集群,在启动spark集群,确保启动成功之后node01执行命令:

    cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0 bin/spark-sql --master spark://node01:7077 --executor-memory 1G --total-executor-cores 2

    指明master地址、每一个executor的内存大小、一共所需要的核数、 mysql数据库连接驱动。 执行成功后的界面:进入到spark-sql 客户端命令行界面 查看当前有哪些数据库, 并创建数据库

    show databases; create database sparkhive;

    看到数据的结果,说明sparksql整合hive成功! 日志太多,我们可以修改spark的日志输出级别(conf/log4j.properties)

    注意:  在spark2.0版本后由于出现了sparkSession,在初始化sqlContext的时候,会设置默认的spark.sql.warehouse.dir=spark-warehouse,此时将hive与sparksql整合完成之后,在通过spark-sql脚本启动的时候,还是会在那里启动spark-sql脚本,就会在当前目录下创建一个spark.sql.warehouse.dir为spark-warehouse的目录,存放由spark-sql创建数据库和创建表的数据信息,与之前hive的数据息不是放在同一个路径下(可以互相访问)。但是此时spark-sql中表的数据在本地,不利于操作,也不安全。

    所有在启动的时候需要加上这样一个参数:

    --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse

     保证spark-sql启动时不在产生新的存放数据的目录,sparksql与hive最终使用的是hive同一存放数据的目录。  如果使用的是spark2.0之前的版本,由于没有sparkSession,不会有spark.sql.warehouse.dir配置项,不会出现上述问题。

    最后的执行脚本; node01执行以下命令重新进去spark-sql

    cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0 bin/spark-sql \ --master spark://node01:7077 \ --executor-memory 1G --total-executor-cores 2 \ --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse

    2. IDEA整合Hive实战[同上]

    基本的操作:

    进入hive安装文档,启动hive,bin/hive 整合之后的person表查询: 原理  Hive表的元数据库中,描述了有哪些database、table、以及表有多少列,每一列是什么类型,以及表的数据保存在hdfs的什么位置  执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务  简单来说Hive就是将SQL根据MySQL中元数据信息转成MapReduce执行,但是速度慢  使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据API  在Spark2.0之后,SparkSession对HiveContext和SqlContext在进行了统一 可以通过操作SparkSession来操作HiveContext和SqlContext。

    2.1 Hive开启MetaStore服务

    1: 修改 hive/conf/hive-site.xml 新增如下配置 2: 后台启动 Hive MetaStore服务

    bin/hive --service metastore & nohup /export/servers/hive/bin/hive --service metastore 2>&1 >> /var/log.log & <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> <property> <name>hive.metastore.local</name> <value>false</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://node01:9083</value> </property> </configuration>

    2.2 SparkSQL整合Hive MetaStore

     Spark有一个内置的 MateStore,使用 Derby 嵌入式数据库保存数据【上面案例】,但是这种方式不适合生产环境,因为这种模式同一时间只能有一个 SparkSession 使用,所以生产环境更推荐使用 Hive 的 MetaStore  SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它, 并且能够使用 HDFS 保存 WareHouse,所以可以直接拷贝 Hadoop 和 Hive 的配置文件到 Spark 的配置目录  hive-site.xml 元数据仓库的位置等信息core-site.xml 安全相关的配置hdfs-site.xml HDFS 相关的配置使用IDEA本地测试的话直接把以上配置文件放在resources目录即可

    2.3 使用SparkSQL操作Hive表

    import org.apache.spark.sql.SparkSession /** * DESC:当前的完成是Spark和HIve的整合,默认选择的是derby保存元数据的信息,当前的案例就是将元数据信息保存在本地 * 1-创建SparkSession环境,在创建环境的时候增加对Hive的支持 * 2-利用HIve的查询数据库的方法,或创建数据表结构语句创建表 * 3-利用Hive导入数据的语句将本地的students.csv数据导入到Hive中 * 4-执行Hive的查询查看是否能够得到结果 */ object SparkOnHiveTest { def main(args: Array[String]): Unit = { //* 1-创建SparkSession环境,在创建环境的时候增加对Hive的支持 val spark: SparkSession = SparkSession.builder() .appName("SparkOnHiveTest") .master("local[*]") .enableHiveSupport() .getOrCreate() //* 2-利用HIve的查询数据库的方法,或创建数据表结构语句创建表 //spark.sql("truncate table student2") spark.sql("create table if not exists student2(id int,name String,age int) row format delimited fields terminated by ','") //* 3-利用Hive导入数据的语句将本地的students.csv数据导入到Hive中 spark.sql("load data local inpath './datasets/input/student.csv' overwrite into table student2") //* 4-执行Hive的查询查看是否能够得到结果 spark.sql("select * from student2").show() //+---+--------+---+ //| id| name|age| //+---+--------+---+ //| 1|zhangsan| 30| // | 2| lisi| 40| // | 3| wangwu| 50| // +---+--------+---+ spark.stop() } }

    展示出所有表数据:

    3. SparkSQL使用

     进入到sparkSQL之后便可以进行sparkSQL的开发了,但是实际工作当中我们一般都是将写好的HQL语句放到脚本文件里面去,然后通过定时执行脚本文件即可 第一步:准备表数据 node01服务器准备表数据

    cd /export/servers hdfs dfs -mkdir -p /sparkhivedatas vim sparkhive.txt 1 zhangsan 18 2 lisi 28 3 wangwu 20 4 zhaoliu 38 5 chenqi 45

    将数据上传到hdfs上面去

    cd /export/servers hdfs dfs -put sparkhive.txt /sparkhivedatas

    第二步:开发SparkSQL脚本,并加载表数据 node01执行以下命令开发sparkSQL脚本

    cd /export/servers vim sparkhive.sql create database if not exists sparkhivedb; create table if not exists sparkhivedb.user(id int,name string,age int) row format delimited fields terminated by ' '; load data inpath 'hdfs://node01:8020/sparkhivedatas' overwrite into table sparkhivedb.user; drop table if exists sparkhivedb.userresult; create table sparkhivedb.userresult as select * from sparkhivedb.user where age > 30;

    第三步:开发Shell脚本,使用shell脚本执行sparkSQL脚本 node01执行以下命令开发sparkSQL脚本

    cd /export/servers vim sparksql.sh #!/bin/bash SPARK_SQL="/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-sql --master spark://node01:7077 --executor-memory 1G - -total-executor-cores 2 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse" $SPARK_SQL -f /export/servers/sparkhive.sql #或者我们也可以直接将sql语句写到shell脚本里面通过 -e 来进行执行,例如 SPARK_HQL="create database if not exists sparkhqldb; create table if not exists sparkhqldb.myuser(id int,name string,age int) row format delimited fields terminated by ' ' ;" $SPARK_SQL -e "$SPARK_HQL"
    Processed: 0.025, SQL: 8