SparkSQL官方文档(3.0.1): sql-programming-guide.html sql-data-sources-hive-tables
Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如文本、Hive、Json等)。Spark SQL的其中一个分支就是Spark on Hive,也就是使用Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业。SparkSql整合hive就是获取hive表中的元数据信息,然后通过SparkSql来操作数据。
第一步:将hive-site.xml拷贝到spark安装路径conf目录 node03执行以下命令来拷贝hive-site.xml到所有的spark安装服务器上面去
cd /export/servers/hive-1.1.0-cdh5.14.0/conf cp hive-site.xml /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/ scp hive-site.xml node03:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/ scp hive-site.xml node02:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/第二步:将mysql的连接驱动包拷贝到spark的jars目录下 node03执行以下命令将连接驱动包拷贝到spark的jars目录下,三台机器都要进行拷贝
cd /export/servers/hive-1.1.0-cdh5.14.0/lib cp mysql-connector-java-5.1.38.jar /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/ scp mysql-connector-java-5.1.38.jar node02:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/ scp mysql-connector-java-5.1.38.jar node03:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/第三步:测试sparksql整合hive是否成功 先启动hadoop集群,在启动spark集群,确保启动成功之后node01执行命令:
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0 bin/spark-sql --master spark://node01:7077 --executor-memory 1G --total-executor-cores 2指明master地址、每一个executor的内存大小、一共所需要的核数、 mysql数据库连接驱动。 执行成功后的界面:进入到spark-sql 客户端命令行界面 查看当前有哪些数据库, 并创建数据库
show databases; create database sparkhive;看到数据的结果,说明sparksql整合hive成功! 日志太多,我们可以修改spark的日志输出级别(conf/log4j.properties)
注意: 在spark2.0版本后由于出现了sparkSession,在初始化sqlContext的时候,会设置默认的spark.sql.warehouse.dir=spark-warehouse,此时将hive与sparksql整合完成之后,在通过spark-sql脚本启动的时候,还是会在那里启动spark-sql脚本,就会在当前目录下创建一个spark.sql.warehouse.dir为spark-warehouse的目录,存放由spark-sql创建数据库和创建表的数据信息,与之前hive的数据息不是放在同一个路径下(可以互相访问)。但是此时spark-sql中表的数据在本地,不利于操作,也不安全。
所有在启动的时候需要加上这样一个参数:
--conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse保证spark-sql启动时不在产生新的存放数据的目录,sparksql与hive最终使用的是hive同一存放数据的目录。 如果使用的是spark2.0之前的版本,由于没有sparkSession,不会有spark.sql.warehouse.dir配置项,不会出现上述问题。
最后的执行脚本; node01执行以下命令重新进去spark-sql
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0 bin/spark-sql \ --master spark://node01:7077 \ --executor-memory 1G --total-executor-cores 2 \ --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse基本的操作:
进入hive安装文档,启动hive,bin/hive 整合之后的person表查询: 原理 Hive表的元数据库中,描述了有哪些database、table、以及表有多少列,每一列是什么类型,以及表的数据保存在hdfs的什么位置 执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务 简单来说Hive就是将SQL根据MySQL中元数据信息转成MapReduce执行,但是速度慢 使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表 所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据API 在Spark2.0之后,SparkSession对HiveContext和SqlContext在进行了统一 可以通过操作SparkSession来操作HiveContext和SqlContext。1: 修改 hive/conf/hive-site.xml 新增如下配置 2: 后台启动 Hive MetaStore服务
bin/hive --service metastore & nohup /export/servers/hive/bin/hive --service metastore 2>&1 >> /var/log.log & <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> <property> <name>hive.metastore.local</name> <value>false</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://node01:9083</value> </property> </configuration>Spark有一个内置的 MateStore,使用 Derby 嵌入式数据库保存数据【上面案例】,但是这种方式不适合生产环境,因为这种模式同一时间只能有一个 SparkSession 使用,所以生产环境更推荐使用 Hive 的 MetaStore SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它, 并且能够使用 HDFS 保存 WareHouse,所以可以直接拷贝 Hadoop 和 Hive 的配置文件到 Spark 的配置目录 hive-site.xml 元数据仓库的位置等信息 core-site.xml 安全相关的配置 hdfs-site.xml HDFS 相关的配置 使用IDEA本地测试的话直接把以上配置文件放在resources目录即可
展示出所有表数据:
进入到sparkSQL之后便可以进行sparkSQL的开发了,但是实际工作当中我们一般都是将写好的HQL语句放到脚本文件里面去,然后通过定时执行脚本文件即可 第一步:准备表数据 node01服务器准备表数据
cd /export/servers hdfs dfs -mkdir -p /sparkhivedatas vim sparkhive.txt 1 zhangsan 18 2 lisi 28 3 wangwu 20 4 zhaoliu 38 5 chenqi 45将数据上传到hdfs上面去
cd /export/servers hdfs dfs -put sparkhive.txt /sparkhivedatas第二步:开发SparkSQL脚本,并加载表数据 node01执行以下命令开发sparkSQL脚本
cd /export/servers vim sparkhive.sql create database if not exists sparkhivedb; create table if not exists sparkhivedb.user(id int,name string,age int) row format delimited fields terminated by ' '; load data inpath 'hdfs://node01:8020/sparkhivedatas' overwrite into table sparkhivedb.user; drop table if exists sparkhivedb.userresult; create table sparkhivedb.userresult as select * from sparkhivedb.user where age > 30;第三步:开发Shell脚本,使用shell脚本执行sparkSQL脚本 node01执行以下命令开发sparkSQL脚本
cd /export/servers vim sparksql.sh #!/bin/bash SPARK_SQL="/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-sql --master spark://node01:7077 --executor-memory 1G - -total-executor-cores 2 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse" $SPARK_SQL -f /export/servers/sparkhive.sql #或者我们也可以直接将sql语句写到shell脚本里面通过 -e 来进行执行,例如 SPARK_HQL="create database if not exists sparkhqldb; create table if not exists sparkhqldb.myuser(id int,name string,age int) row format delimited fields terminated by ' ' ;" $SPARK_SQL -e "$SPARK_HQL"