08 RDD.cache

    科技2025-06-11  46

    前言

    最近翻了一下 官方文档中的 RDD Programming Guide, 里面有一些 关于 cache 的介绍 

    呵呵 这里重新整理一下, 当然 只是一最简单的情况(1 worker, memory, 1 replics)做一个整理 

     

    这部分的官方文档镇楼, 看原文请移步 RDD Programming Guide - RDD Persistence

    以下图片截图于 RDD Persistence

     

     

    以下调试基于 jdk1.8 + scala2.11 + spark2.4.5 

     

     

    测试用例

    package com.hx.test import org.apache.spark.{SparkConf, SparkContext} /** * Test24CacheUasge * * @author Jerry.X.He <970655147@qq.com> * @version 1.0 * @date 2020-10-03 18:30 */ object Test24CacheUasge { // Test24CacheUasge def main(args: Array[String]): Unit = { val logFile = "resources/Test01WordCount.txt" val conf = new SparkConf().setAppName("Test24CacheUasge").setMaster("local[1]") val sc = new SparkContext(conf) val allWords = sc.textFile(logFile, 2) .flatMap(line => line.split(" ")) .map(word => { println(word) word }) // allWords.cache() // action 1 allWords .map(word => (word, 1)) .reduceByKey((left, right) => left + right) .collect() .foreach(entry => println(entry._1, entry._2)) // action 2 println(allWords.count()) System.in.read() sc.stop() } }

     

    这里目前是 暂时注释掉了 allWords.cache(), 输出结果如下 

    234 345 123 346 234 123 124 (345,1) (123,2) (234,2) (346,1) (124,1) 234 345 123 346 234 123 124 7

    从结果上面来看, 两个 action 都得到了正确 的结果, 如下代码片段 是被执行了两次 

    .map(word => { println(word) word })

     

    我们加上 allWords.cache() 之后, 输出结果如下, 可以发现 两个 action 都得到了正确 的结果, 输出 word 的代码片段只被执行了一次 

    234 345 123 346 234 123 124 (345,1) (123,2) (234,2) (346,1) (124,1) 7

     

    然后看一下 spark UI 上面 count 这个 job 对应的 stage, 依赖的最后一个 rdd 被标记为 cached 

     

     

    RDD.cache 

    我们来看一下 RDD.cache 的实现 

    给当前 rdd 打上了标记, 需要缓存在 内存中, 并向 sparkContext 注册 

     

     

    缓存计算数据

    如下图, 可以看到 发现 allWords 需要缓存, 这个是 allWords 的 partition0 计算之后, 然后进行缓存的操作, 吧 partition0 的结果 ["234", "345", "123", "346", "234"] 放到了 memoryStore, 同理 会把 partition1 的结果 ["123", "124"] 放到 memoryStore

     

    我们来看一下上下文情况, Task 计算的 RDD 是一个 MapPartitionsRDD, func 是 com/hx/test/Test24CacheUasge$$anonfun$main$2, 我们参照一下 下面的匿名函数参照图来看一下 

    可以确认 Task 计算的 MapPartitionsRDD 是 "map(word => (word, 1))" 产生的 RDD 

    所以当前的情况是 action 1[collect] 触发了 action, 根据 Shuffle Dependeciy 划分 Stage, "reduceByKey((left, right) => left + right)" 之前的 RDD 链对应于一个 ShuffleMapStage, "reduceByKey((left, right) => left + right)" 生成的 ShuffleRDD 对应于一个 ResultStage 

    "reduceByKey((left, right) => left + right)" 之前的 RDD 链进行计算的时候, 第一个为 "map(word => (word, 1))" 生成的 rdd, 不需要缓存, 因此直接走的时候 computeOrReadCheckpoint, 其依赖的 RDD 为 allWords, 是需要缓存的, 走的是 getOrCompute 

     

    各个匿名函数的分布情况如下 

     

    然后 之后的流程是 从 memoryStore 中读取数据, 返回给 调用端 RDD.getOrCompute 

     

     

    缓存的使用

    然后是 缓存的使用, getOrElseUpdate 里面首先尝试从本地 或者 其他节点获取缓存, 这里是从本地获取到了缓存的数据 

    直接返回给调用端 RDD.getOrCompute 

     

    根据持久化机制获取到了数据, 直接返回 blockResult, 上一层级封装成 Iterator 

     

     

    看完这些 应该能够解释用例中的问题了吧   

     

     

    完 

     

     

    Processed: 0.013, SQL: 8