Spark(33) -- Spark SQL补充 -- 连接

    科技2024-03-05  70

    1. 无类型连接算子 join 的 API

    Step 1: 什么是连接  按照 PostgreSQL 的文档中所说, 只要能在一个查询中, 同一时间并发的访问多条数据, 就叫做连接. 做到这件事有两种方式

     一种是把两张表在逻辑上连接起来, 一条语句中同时访问两张表

    select * from user join address on user.address_id = address.id

     还有一种方式就是表连接自己, 一条语句也能访问自己中的多条数据

    select * from user u1 join (select * from user) u2 on u1.id = u2.id

    Step 2: join 算子的使用非常简单, 大致的调用方式如下

    join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

    Step 3: 简单连接案例  表结构如下

    +---+------+------+ +---+---------+ | id| name|cityId| | id| name| +---+------+------+ +---+---------+ | 0| Lucy| 0| | 0| Beijing| | 1| Lily| 0| | 1| Shanghai| | 2| Tim| 2| | 2|Guangzhou| | 3|Danial| 0| +---+---------+ +---+------+------+

     如果希望对这两张表进行连接, 首先应该注意的是可以连接的字段, 比如说此处的左侧表 cityId 和右侧表 id 就是可以连接的字段, 使用 join 算子就可以将两个表连接起来, 进行统一的查询

    val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 0)) .toDF("id", "name", "cityId") val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou")) .toDF("id", "name") person.join(cities, person.col("cityId") === cities.col("id")) .select(person.col("id"), person.col("name"), cities.col("name") as "city") .show() /** * 执行结果: * * +---+------+---------+ * | id| name| city| * +---+------+---------+ * | 0| Lucy| Beijing| * | 1| Lily| Beijing| * | 2| Tim|Guangzhou| * | 3|Danial| Beijing| * +---+------+---------+ */

    Step 4: 什么是连接?  现在两个表连接得到了如下的表

    +---+------+---------+ | id| name| city| +---+------+---------+ | 0| Lucy| Beijing| | 1| Lily| Beijing| | 2| Tim|Guangzhou| | 3|Danial| Beijing| +---+------+---------+

     通过对这张表的查询, 这个查询是作用于两张表的, 所以是同一时间访问了多条数据

    spark.sql("select name from user_city where city = 'Beijing'").show() /** * 执行结果 * * +------+ * | name| * +------+ * | Lucy| * | Lily| * |Danial| * +------+ */

    2. 连接类型

     如果要运行如下代码, 需要先进行数据准备

    private val spark = SparkSession.builder() .master("local[6]") .appName("aggregation") .getOrCreate() import spark.implicits._ val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 3)) .toDF("id", "name", "cityId") person.createOrReplaceTempView("person") val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou")) .toDF("id", "name") cities.createOrReplaceTempView("cities")

    3. 广播连接

    Step 1: 正常情况下的 Join 过程  Join 会在集群中分发两个数据集, 两个数据集都要复制到 Reducer 端, 是一个非常复杂和标准的 ShuffleDependency, 有什么可以优化效率吗?

    Step 2: Map 端 Join  前面图中看的过程, 之所以说它效率很低, 原因是需要在集群中进行数据拷贝, 如果能减少数据拷贝, 就能减少开销 如果能够只分发一个较小的数据集呢?  可以将小数据集收集起来, 分发给每一个 Executor, 然后在需要 Join 的时候, 让较大的数据集在 Map 端直接获取小数据集, 从而进行 Join, 这种方式是不需要进行 Shuffle 的, 所以称之为 Map 端 Join

    Step 3: Map 端 Join 的常规实现  如果使用 RDD 的话, 该如何实现 Map 端 Join 呢?

    val personRDD = spark.sparkContext.parallelize(Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 3))) val citiesRDD = spark.sparkContext.parallelize(Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou"))) val citiesBroadcast = spark.sparkContext.broadcast(citiesRDD.collectAsMap()) val result = personRDD.mapPartitions( iter => { val citiesMap = citiesBroadcast.value // 使用列表生成式 yield 生成列表 val result = for (person <- iter if citiesMap.contains(person._3)) yield (person._1, person._2, citiesMap(person._3)) result } ).collect() result.foreach(println(_))

    Step 4: 使用 Dataset 实现 Join 的时候会自动进行 Map 端 Join  自动进行 Map 端 Join 需要依赖一个系统参数 spark.sql.autoBroadcastJoinThreshold, 当数据集小于这个参数的大小时, 会自动进行 Map 端 Join  如下, 开启自动 Join

    println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold").toInt / 1024 / 1024) println(person.crossJoin(cities).queryExecution.sparkPlan.numberedTreeString)

     当关闭这个参数的时候, 则不会自动 Map 端 Join 了

    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) println(person.crossJoin(cities).queryExecution.sparkPlan.numberedTreeString)

    Step 5: 也可以使用函数强制开启 Map 端 Join  在使用 Dataset 的 join 时, 可以使用 broadcast 函数来实现 Map 端 Join

    import org.apache.spark.sql.functions._ spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) println(person.crossJoin(broadcast(cities)).queryExecution.sparkPlan.numberedTreeString)

     即使是使用 SQL 也可以使用特殊的语法开启

    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val resultDF = spark.sql( """ |select /*+ MAPJOIN (rt) */ * from person cross join cities rt """.stripMargin) println(resultDF.queryExecution.sparkPlan.numberedTreeString)
    Processed: 0.020, SQL: 8