最近翻了一下 官方文档中的 RDD Programming Guide, 里面有一些 关于 cache 的介绍
呵呵 这里重新整理一下, 当然 只是一最简单的情况(1 worker, memory, 1 replics)做一个整理
这部分的官方文档镇楼, 看原文请移步 RDD Programming Guide - RDD Persistence
以下图片截图于 RDD Persistence
以下调试基于 jdk1.8 + scala2.11 + spark2.4.5
这里目前是 暂时注释掉了 allWords.cache(), 输出结果如下
234 345 123 346 234 123 124 (345,1) (123,2) (234,2) (346,1) (124,1) 234 345 123 346 234 123 124 7从结果上面来看, 两个 action 都得到了正确 的结果, 如下代码片段 是被执行了两次
.map(word => { println(word) word })
我们加上 allWords.cache() 之后, 输出结果如下, 可以发现 两个 action 都得到了正确 的结果, 输出 word 的代码片段只被执行了一次
234 345 123 346 234 123 124 (345,1) (123,2) (234,2) (346,1) (124,1) 7
然后看一下 spark UI 上面 count 这个 job 对应的 stage, 依赖的最后一个 rdd 被标记为 cached
我们来看一下 RDD.cache 的实现
给当前 rdd 打上了标记, 需要缓存在 内存中, 并向 sparkContext 注册
如下图, 可以看到 发现 allWords 需要缓存, 这个是 allWords 的 partition0 计算之后, 然后进行缓存的操作, 吧 partition0 的结果 ["234", "345", "123", "346", "234"] 放到了 memoryStore, 同理 会把 partition1 的结果 ["123", "124"] 放到 memoryStore
我们来看一下上下文情况, Task 计算的 RDD 是一个 MapPartitionsRDD, func 是 com/hx/test/Test24CacheUasge$$anonfun$main$2, 我们参照一下 下面的匿名函数参照图来看一下
可以确认 Task 计算的 MapPartitionsRDD 是 "map(word => (word, 1))" 产生的 RDD
所以当前的情况是 action 1[collect] 触发了 action, 根据 Shuffle Dependeciy 划分 Stage, "reduceByKey((left, right) => left + right)" 之前的 RDD 链对应于一个 ShuffleMapStage, "reduceByKey((left, right) => left + right)" 生成的 ShuffleRDD 对应于一个 ResultStage
"reduceByKey((left, right) => left + right)" 之前的 RDD 链进行计算的时候, 第一个为 "map(word => (word, 1))" 生成的 rdd, 不需要缓存, 因此直接走的时候 computeOrReadCheckpoint, 其依赖的 RDD 为 allWords, 是需要缓存的, 走的是 getOrCompute
各个匿名函数的分布情况如下
然后 之后的流程是 从 memoryStore 中读取数据, 返回给 调用端 RDD.getOrCompute
然后是 缓存的使用, getOrElseUpdate 里面首先尝试从本地 或者 其他节点获取缓存, 这里是从本地获取到了缓存的数据
直接返回给调用端 RDD.getOrCompute
根据持久化机制获取到了数据, 直接返回 blockResult, 上一层级封装成 Iterator
看完这些 应该能够解释用例中的问题了吧
完