Spark(29) -- SparkSQL底层如何执行及分布式SQL引擎

    科技2024-01-22  100

    1. RDD 和 SparkSQL 运行时的区别

    RDD 的运行流程 大致运行步骤

    先将 RDD 解析为由 Stage 组成的 DAG, 后将 Stage 转为 Task 直接运行

    问题

    任务会按照代码所示运行, 依赖开发者的优化, 开发者的会在很大程度上影响运行效率

    解决办法

    创建一个组件, 帮助开发者修改和优化代码, 但这在 RDD 上是无法实现的

    为什么 RDD 无法自我优化?

    RDD 没有 Schema 信息RDD 可以同时处理结构化和非结构化的数据

    SparkSQL 提供了什么?  和 RDD 不同, SparkSQL 的 Dataset 和 SQL 并不是直接生成计划交给集群执行, 而是经过了一个叫做 Catalyst 的优化器, 这个优化器能够自动帮助开发者优化代码  也就是说, 在 SparkSQL 中, 开发者的代码即使不够优化, 也会被优化为相对较好的形式去执行

    为什么 SparkSQL 提供了这种能力?  首先, SparkSQL 大部分情况用于处理结构化数据和半结构化数据, 所以 SparkSQL 可以获知数据的 Schema, 从而根据其 Schema 来进行优化

    2. Catalyst

     为了解决过多依赖 Hive 的问题, SparkSQL 使用了一个新的 SQL 优化器替代 Hive 中的优化器, 这个优化器就是 Catalyst, 整个 SparkSQL 的架构大致如下:

    API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句收到 SQL 语句以后, 将其交给 Catalyst, Catalyst 负责解析 SQL, 生成执行计划等Catalyst 的输出应该是 RDD 的执行计划最终交由集群运行

    具体流程: Step 1 : 解析 SQL, 并且生成 AST (抽象语法树) Step 2 : 在 AST 中加入元数据信息, 做这一步主要是为了一些优化, 例如 col = col 这样的条件, 下图是一个简略图, 便于理解

    score.id → id#1#L 为 score.id 生成 id 为 1, 类型是 Longscore.math_score → math_score#2#L 为 score.math_score 生成 id 为 2, 类型为 Longpeople.id → id#3#L 为 people.id 生成 id 为 3, 类型为 Longpeople.age → age#4#L 为 people.age 生成 id 为 4, 类型为 Long

    Step 3 : 对已经加入元数据的 AST, 输入优化器, 进行优化, 从两种常见的优化开始, 简单介绍:  谓词下推 Predicate Pushdown, 将 Filter 这种可以减小数据集的操作下推, 放在 Scan 的位置, 这样可以减少操作时候的数据量。

    列值裁剪 Column Pruning, 在谓词下推后, people 表之上的操作只用到了 id 列, 所以可以把其它列裁剪掉, 这样可以减少处理的数据量, 从而优化处理速度还有其余很多优化点, 大概一共有一二百种, 随着 SparkSQL 的发展, 还会越来越多, 感兴趣的同学可以继续通过源码了解, 源码在 org.apache.spark.sql.catalyst.optimizer.Optimizer

    Step 4 : 上面的过程生成的 AST 其实最终还没办法直接运行, 这个 AST 叫做 逻辑计划, 结束后, 需要生成 物理计划, 从而生成 RDD 来运行  在生成物理计划的时候, 会经过成本模型对整棵树再次执行优化, 选择一个更好的计划  在生成物理计划以后, 因为考虑到性能, 所以会使用代码生成, 在机器中运行 可以使用 queryExecution 方法查看逻辑执行计划, 使用 explain 方法查看物理执行计划 也可以使用 Spark WebUI 进行查看  SparkSQL 和 RDD 不同的主要点是在于其所操作的数据是结构化的, 提供了对数据更强的感知和分析能力, 能够对代码进行更深层的优化, 而这种能力是由一个叫做 Catalyst 的优化器所提供的。  Catalyst 的主要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析, 生成逻辑计划, 后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD 的形式运行

    2.1 详解

     在前面【案例:电影评分数据分析】中,运行应用程序代码,通过WEB UI界面监控可以看出,无论使用DSL还是SQL,构建Job的DAG图一样的,性能是一样的,原因在于SparkSQL中引擎:Catalyst:将SQL和DSL转换为相同逻辑计划

     Spark SQL是Spark最新,技术最复杂的组件之一。它为SQL查询和新的DataFrame API提供支持。Spark SQL的核心是Catalyst优化器,它以一种新颖的方式利用高级编程语言功能(例如Scala的模式匹配和quasiquotes)来构建可扩展的查询优化器

     SparkSQL的Catalyst优化器是整个SparkSQL pipeline的中间核心部分,其执行策略主要两方向:

    基于规则优化/Rule Based Optimizer/RBO;基于代价优化/Cost Based Optimizer/CBO;

     从上图可见,无论是直接使用SQL语句还是使用 ataFrame,都会经过一些列步骤转换成DAG对RDD的操作。  Catalyst工作流程:SQL语句首先通过Parser模块被解析为语法树,此棵树称为Unresolved Logical Plan;Unresolved Logical Plan通过Analyzer模块借助于数据元数据解析为Logical Plan;此时再通过各种基于规则的Optimizer进行深入优化,得到Optimized Logical Plan;优化后的逻辑执行计划依然是逻辑的,需要将逻辑计划转化为Physical Plan。

    核心三个点:

    第一点、Parser,第三方类库ANTLR实现。将sql字符串切分成Token,根据语义规则解析成一颗AST语法树;第二点、Analyzer,Unresolved Logical Plan,进行数据类型绑定和函数绑定;第三点、Optimizer,规则优化就是模式匹配满足特定规则的节点等价转换为另一颗语法树;

    3. 分布式SQL引擎

    回顾一下,如何使用Hive进行数据分析的,提供哪些方式交互分析???

    方式一:交互式命令行(CLI) bin/hive,编写SQL语句及DDL语句 方式二:启动服务HiveServer2(Hive ThriftServer2) 将Hive当做一个服务启动(类似MySQL数据库,启动一个服务),端口为100001)、交互式命令行,bin/beeline,CDH 版本HIVE建议使用此种方式,CLI方式过时2)、JDBC/ODBC方式,类似MySQL中JDBC/ODBC方式 SparkSQL模块从Hive框架衍生发展而来,所以Hive提供的所有功能(数据分析交互式方式)都支持,文档:http://spark.apache.org/docs/2.4.5/sql-distributed-sql-engine.html。

    3.1 Spark SQL CLI

    SparkSQL提供spark-sql命令,类似Hive中bin/hive命令,专门编写SQL分析,启动命令如下:

    SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-sql --master local[2] --conf spark.sql.shuffle.partitions=4

    编写SQL执行,截图如下: 此种方式,目前企业使用较少,主要使用下面所述ThriftServer服务,通过Beeline连接执行SQL。

    3.2 ThriftServer JDBC/ODBC Server

    Spark Thrift Server将Spark Applicaiton当做一个服务运行,提供Beeline客户端和JDBC方式访问,与Hive中HiveServer2服务一样的。此种方式必须掌握:在企业中使用PySpark和SQL分析数据,尤其针对数据分析行业。 Spark Thrift JDBC/ODBC server 依赖于HiveServer2服务(依赖JAR包),所有要想使用此功能,在编译Spark源码时,支持Hive Thrift。 注意:启动Spark Thrift JDBC/ODBC Server时,不需要HiveServer2服务。 在$SPARK_HOME目录下的sbin目录,有相关的服务启动命令:

    SPARK_HOME=/export/server/spark $SPARK_HOME/sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=10000 \ --hiveconf hive.server2.thrift.bind.host=node1.itcast.cn \ --master local[2]

    监控WEB UI界面:node1:4040/jobs/

    3.2.1 beeline 客户端

    SparkSQL类似Hive提供beeline客户端命令行连接ThriftServer,启动命令如下: /export/server/spark/bin/beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://node1.itcast.cn:10000 Connecting to jdbc:hive2://node1.itcast.cn:10000 Enter username for jdbc:hive2://node1.itcast.cn:10000: root Enter password for jdbc:hive2://node1.itcast.cn:10000: ****

    编写SQL语句执行分析: 在实际大数据分析项目中,使用SparkSQL时,往往启动一个ThriftServer服务,分配较多资源(Executor数目和内存、CPU),不同的用户启动beeline客户端连接,编写SQL语句分析数据。 cache table之后可以在web页面查看

    3.2.2 JDBC/ODBC 客户端

    SparkSQL中提供类似JDBC/ODBC方式,连接Spark ThriftServer服务,执行SQL语句,首先添加Maven依赖库: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive-thriftserver_2.11</artifactId> <version>2.4.5</version> </dependency>

    参考文档: HiveServer2Clients-JDBC

    范例演示: 采用JDBC方式读取Hive中db_hive.emp表的数据。

    import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} /** * SparkSQL 启动ThriftServer服务,通过JDBC方式访问数据分析查询 * i). 通过Java JDBC的方式,来访问Thrift JDBC/ODBC server,调用Spark SQL,并直接查询Hive中的数据 * ii). 通过Java JDBC的方式,必须通过HTTP传输协议发送thrift RPC消息,Thrift JDBC/ODBC server必须通过上面命令启动HTTP模式 */ object SparkThriftJDBC { def main(args: Array[String]): Unit = { // 定义相关实例对象,未进行初始化 var conn: Connection = null var pstmt: PreparedStatement = null var rs: ResultSet = null try { // TODO: a. 加载驱动类 Class.forName("org.apache.hive.jdbc.HiveDriver") // TODO: b. 获取连接Connection conn = DriverManager.getConnection( "jdbc:hive2://node1.itcast.cn:10000/db_hive", "root", "123456" ) // TODO: c. 构建查询语句 val sqlStr: String = """ |select e.ename, e.sal, d.dname from emp e join dept d on e.deptno = d.deptno """.stripMargin pstmt = conn.prepareStatement(sqlStr) // TODO: d. 执行查询,获取结果 rs = pstmt.executeQuery() // 打印查询结果 while (rs.next()) { println(s"empno = ${rs.getInt(1)}, ename = ${rs.getString(2)}, sal = ${rs.getDouble(3)}, dname = ${rs.getString(4)}") } } catch { case e: Exception => e.printStackTrace() } finally { if (null != rs) rs.close() if (null != pstmt) pstmt.close() if (null != conn) conn.close() } } }
    Processed: 0.022, SQL: 8