09 理解闭包

    科技2025-09-19  102

    前言

    最近翻了一下 官方文档中的 RDD Programming Guide, 里面有一些 关于 闭包 的介绍 

    呵呵 这里重新整理一下, 当然 只是一最简单的情况(1 worker, memory, 1 replics)做一个整理 

     

    这部分的官方文档镇楼, 看原文请移步 RDD Programming Guide - Understanding closures

    以下图片截图于 Understanding closures

     

     

    以下调试基于 jdk1.8 + scala2.11 + spark2.4.5 

     

     

    测试代码

    package com.hx.test import org.apache.spark.{SparkConf, SparkContext} /** * Test25Increment * * @author Jerry.X.He <970655147@qq.com> * @version 1.0 * @date 2020-10-04 16:30 */ object Test25Increment { // Test25Increment def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Test25Increment").setMaster("local[1]") val sc = new SparkContext(conf) var counter = 2 val rdd = sc.parallelize(1 to 100, 2) rdd.foreach(x => counter += x) println(counter) System.in.read() sc.stop() } }

     

    最后测试结果如下 

    如果你对 spark 有一定的了解, 或者 理解了上面的这部分的文档, 再来看这个 一定不是一个问题 

    20/10/08 15:50:52 INFO DAGScheduler: ResultStage 0 (foreach at Test25Increment.scala:23) finished in 0.325 s 20/10/08 15:50:52 INFO DAGScheduler: Job 0 finished: foreach at Test25Increment.scala:23, took 0.381354 s 2

     

     

    业务函数的使用

    ResultTask 执行业务函数的时候打上断点看一下, 被两层 SparkContext 的函数封装, 最里面是核心的业务函数 "com/hx/test/Test25Increment$$anonfun$main$1", 也就是上面的匿名函数 "x => counter += x" 对应的 represention, 可以看到 其中有一个成员变量 counter 值为 2

    另外注意一点就是 func 来自于哪里?, 来自于 taskBinary 反序列化 

     

     

    业务函数的序列化

    我们来看一下 函数序列化的地方, 将 rdd 和 func  组合的 Tuple2 序列化为 taskBinary 

    这里可以说明的是 func, cleanedFunc, cleanF 这三个函数都是可以序列化的, 我们这里着重需要关注的是 cleanF "com/hx/test/Test25Increment$$anonfun$main$1" 

    至于序列化的细节, 我们这里就不再去关注了, 可以参考简单点的数据的序列化 : 05 Spark 的 WordCount 

     

     

    函数 com/hx/test/Test25Increment$$anonfun$main$1 是怎么来的?

    我们来看一下 driver program 

     

    请注意如下代码片段 

    创建了一个 com/hx/test/Test25Increment$$anonfun$main$1 对象, 并且调用了构造方法 Test25Increment$$anonfun$main$1.<init> (Lscala/runtime/IntRef;)V, 传入的参数为 slot 4, 对应的数据项为 counter 

    然后将给定的 com/hx/test/Test25Increment$$anonfun$main$1 作为参数调用 rdd.foreach 

    LINENUMBER 23 L4 ALOAD 5 NEW com/hx/test/Test25Increment$$anonfun$main$1 DUP ALOAD 4 INVOKESPECIAL com/hx/test/Test25Increment$$anonfun$main$1.<init> (Lscala/runtime/IntRef;)V INVOKEVIRTUAL org/apache/spark/rdd/RDD.foreach (Lscala/Function1;)V

     

     

    关于 com/hx/test/Test25Increment$$anonfun$main$1

    来看一下 com/hx/test/Test25Increment$$anonfun$main$1 的 class 的信息, 继承自 scala.runtime.AbstractFunction1$mcVI$sp, 实现了 scala.Serializable[trait Serializable extends scala.Any with java.io.Serializable]

    master:test jerry$ javap -c -v Test25Increment$$anonfun$main$1 Classfile /Users/jerry/IdeaProjects/HelloSpark/target/production/HelloSpark/com/hx/test/Test25IncrementTmp.class Last modified Oct 4, 2020; size 1285 bytes MD5 checksum 336e2ff93fb97e506f4b80784f723f51 Compiled from "Test25Increment.scala" public final class com.hx.test.Test25Increment$$anonfun$main$1 extends scala.runtime.AbstractFunction1$mcVI$sp implements scala.Serializable minor version: 0 major version: 50 flags: ACC_PUBLIC, ACC_FINAL, ACC_SUPER Constant pool: #1 = Utf8 com/hx/test/Test25Increment$$anonfun$main$1 #2 = Class #1 // com/hx/test/Test25Increment$$anonfun$main$1 #3 = Utf8 scala/runtime/AbstractFunction1$mcVI$sp #4 = Class #3 // scala/runtime/AbstractFunction1$mcVI$sp #5 = Utf8 scala/Serializable #6 = Class #5 // scala/Serializable #7 = Utf8 Test25Increment.scala #8 = Utf8 com/hx/test/Test25Increment$ #9 = Class #8 // com/hx/test/Test25Increment$ #10 = Utf8 main #11 = Utf8 ([Ljava/lang/String;)V #12 = NameAndType #10:#11 // main:([Ljava/lang/String;)V #13 = Utf8 serialVersionUID #14 = Utf8 J #15 = Long 0l #17 = Utf8 counter$1 #18 = Utf8 Lscala/runtime/IntRef; #19 = Utf8 apply #20 = Utf8 (I)V #21 = Utf8 apply$mcVI$sp #22 = NameAndType #21:#20 // apply$mcVI$sp:(I)V #23 = Methodref #2.#22 // com/hx/test/Test25Increment$$anonfun$main$1.apply$mcVI$sp:(I)V #24 = Utf8 this #25 = Utf8 Lcom/hx/test/Test25Increment$$anonfun$main$1; #26 = Utf8 x #27 = Utf8 I #28 = NameAndType #17:#18 // counter$1:Lscala/runtime/IntRef; #29 = Fieldref #2.#28 // com/hx/test/Test25Increment$$anonfun$main$1.counter$1:Lscala/runtime/IntRef; #30 = Utf8 scala/runtime/IntRef #31 = Class #30 // scala/runtime/IntRef #32 = Utf8 elem #33 = NameAndType #32:#27 // elem:I #34 = Fieldref #31.#33 // scala/runtime/IntRef.elem:I #35 = Utf8 (Ljava/lang/Object;)Ljava/lang/Object; #36 = Utf8 scala/runtime/BoxesRunTime #37 = Class #36 // scala/runtime/BoxesRunTime #38 = Utf8 unboxToInt #39 = Utf8 (Ljava/lang/Object;)I #40 = NameAndType #38:#39 // unboxToInt:(Ljava/lang/Object;)I #41 = Methodref #37.#40 // scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I #42 = NameAndType #19:#20 // apply:(I)V #43 = Methodref #2.#42 // com/hx/test/Test25Increment$$anonfun$main$1.apply:(I)V #44 = Utf8 scala/runtime/BoxedUnit #45 = Class #44 // scala/runtime/BoxedUnit #46 = Utf8 UNIT #47 = Utf8 Lscala/runtime/BoxedUnit; #48 = NameAndType #46:#47 // UNIT:Lscala/runtime/BoxedUnit; #49 = Fieldref #45.#48 // scala/runtime/BoxedUnit.UNIT:Lscala/runtime/BoxedUnit; #50 = Utf8 v1 #51 = Utf8 Ljava/lang/Object; #52 = Utf8 <init> #53 = Utf8 (Lscala/runtime/IntRef;)V #54 = Utf8 ()V #55 = NameAndType #52:#54 // "<init>":()V #56 = Methodref #4.#55 // scala/runtime/AbstractFunction1$mcVI$sp."<init>":()V #57 = Utf8 ConstantValue #58 = Utf8 Code #59 = Utf8 LocalVariableTable #60 = Utf8 LineNumberTable #61 = Utf8 SourceFile #62 = Utf8 EnclosingMethod #63 = Utf8 InnerClasses #64 = Utf8 ScalaInlineInfo #65 = Utf8 Scala { public static final long serialVersionUID; descriptor: J flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL ConstantValue: long 0l public final void apply(int); descriptor: (I)V flags: ACC_PUBLIC, ACC_FINAL Code: stack=2, locals=2, args_size=2 0: aload_0 1: iload_1 2: invokevirtual #23 // Method apply$mcVI$sp:(I)V 5: return LocalVariableTable: Start Length Slot Name Signature 0 6 0 this Lcom/hx/test/Test25Increment$$anonfun$main$1; 0 6 1 x I LineNumberTable: line 23: 0 public void apply$mcVI$sp(int); descriptor: (I)V flags: ACC_PUBLIC Code: stack=3, locals=2, args_size=2 0: aload_0 1: getfield #29 // Field counter$1:Lscala/runtime/IntRef; 4: aload_0 5: getfield #29 // Field counter$1:Lscala/runtime/IntRef; 8: getfield #34 // Field scala/runtime/IntRef.elem:I 11: iload_1 12: iadd 13: putfield #34 // Field scala/runtime/IntRef.elem:I 16: return LocalVariableTable: Start Length Slot Name Signature 0 17 0 this Lcom/hx/test/Test25Increment$$anonfun$main$1; 0 17 1 x I LineNumberTable: line 23: 0 public final java.lang.Object apply(java.lang.Object); descriptor: (Ljava/lang/Object;)Ljava/lang/Object; flags: ACC_PUBLIC, ACC_FINAL, ACC_BRIDGE, ACC_SYNTHETIC Code: stack=2, locals=2, args_size=2 0: aload_0 1: aload_1 2: invokestatic #41 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I 5: invokevirtual #43 // Method apply:(I)V 8: getstatic #49 // Field scala/runtime/BoxedUnit.UNIT:Lscala/runtime/BoxedUnit; 11: areturn LocalVariableTable: Start Length Slot Name Signature 0 12 0 this Lcom/hx/test/Test25Increment$$anonfun$main$1; 0 12 1 v1 Ljava/lang/Object; LineNumberTable: line 23: 0 public com.hx.test.Test25Increment$$anonfun$main$1(scala.runtime.IntRef); descriptor: (Lscala/runtime/IntRef;)V flags: ACC_PUBLIC Code: stack=2, locals=2, args_size=2 0: aload_0 1: aload_1 2: putfield #29 // Field counter$1:Lscala/runtime/IntRef; 5: aload_0 6: invokespecial #56 // Method scala/runtime/AbstractFunction1$mcVI$sp."<init>":()V 9: return LocalVariableTable: Start Length Slot Name Signature 0 10 0 this Lcom/hx/test/Test25Increment$$anonfun$main$1; 0 10 1 counter$1 Lscala/runtime/IntRef; LineNumberTable: line 23: 0 } SourceFile: "Test25Increment.scala" EnclosingMethod: #9.#12 // com.hx.test.Test25Increment$.main InnerClasses: public final #2; //class com/hx/test/Test25Increment$$anonfun$main$1 Error: unknown attribute ScalaInlineInfo: length = 0x18 01 01 00 04 00 34 00 35 01 00 15 00 14 01 00 13 00 14 01 00 13 00 23 01 Error: unknown attribute Scala: length = 0x0

     

    来看一下 它的构造方法 

    做的事情就是 初始化当前对象的了 counter$1, 然后 调用了基类的 构造方法 

    public com.hx.test.Test25Increment$$anonfun$main$1(scala.runtime.IntRef); descriptor: (Lscala/runtime/IntRef;)V flags: ACC_PUBLIC Code: stack=2, locals=2, args_size=2 0: aload_0 1: aload_1 2: putfield #29 // Field counter$1:Lscala/runtime/IntRef; 5: aload_0 6: invokespecial #56 // Method scala/runtime/AbstractFunction1$mcVI$sp."<init>":()V 9: return LocalVariableTable: Start Length Slot Name Signature 0 10 0 this Lcom/hx/test/Test25Increment$$anonfun$main$1; 0 10 1 counter$1 Lscala/runtime/IntRef; LineNumberTable: line 23: 0

     

    这里可以发现的一个比较奇怪的问题是 javap -c -v 中并看不到 counter$1 的这个字段, 但是 代码中却使用了, 然后 HSDB 能够看到 counter$1 这个字段, 呵呵 奇怪, HSDB 中 dump 下来的 class 和 compiler 编译的 class 一致 

     

    基类 AbstractFunction1$mcVI$sp 的构造方法如下 

    调用基类 AbstractFunction1 的构造方法, 并且调用了 Function1$mcVI$sp$class 的一些初始化方法 

    public scala.runtime.AbstractFunction1$mcVI$sp(); descriptor: ()V flags: ACC_PUBLIC Code: stack=1, locals=1, args_size=1 0: aload_0 1: invokespecial #12 // Method scala/runtime/AbstractFunction1."<init>":()V 4: aload_0 5: invokestatic #18 // Method scala/Function1$mcVI$sp$class.$init$:(Lscala/Function1$mcVI$sp;)V 8: return LineNumberTable: line 12: 0 LocalVariableTable: Start Length Slot Name Signature 0 9 0 this Lscala/runtime/AbstractFunction1$mcVI$sp;

     

    基类 AbstractFunction1 的构造方法如下 

    调用基类 java.lang.Object 的构造方法, 并且调用了 Function1$class 的一些初始化方法 

    public scala.runtime.AbstractFunction1(); descriptor: ()V flags: ACC_PUBLIC Code: stack=1, locals=1, args_size=1 0: aload_0 1: invokespecial #159 // Method java/lang/Object."<init>":()V 4: aload_0 5: invokestatic #163 // Method scala/Function1$class.$init$:(Lscala/Function1;)V 8: return LineNumberTable: line 12: 0 LocalVariableTable: Start Length Slot Name Signature 0 9 0 this Lscala/runtime/AbstractFunction1; Signature: #157 // ()V

     

    综合一下流程 其实就是 "x => counter += x" 会生成一个 com/hx/test/Test25Increment$$anonfun$main$1 的类, 他继承了 Function1[Object, BoxedUnit], 并且实现了 java.io.Serializable, 因为改函数的上下文计算依赖外部变量 counter, com/hx/test/Test25Increment$$anonfun$main$1 中保留了一个 counter 的引用 

    "rdd.foreach(x => counter += x)" 编译的时候会生成一个 com/hx/test/Test25Increment$$anonfun$main$1 的实例, 上下文中计算依赖于 counter, 传入了 counter 来初始化 com/hx/test/Test25Increment$$anonfun$main$1 的实例 

    在 SparkContext.runJob 的过程中 会在外面封装两层 func, cleanedFunc 

    然后 DagScheduler 创建 taskBinary 的时候序列化了依赖的 rdd 和 包含了 com/hx/test/Test25Increment$$anonfun$main$1 的 func, 并广播了出去  

     

    在 Executor 执行 ResultTask 的时候, 从 taskBinary 中反序列化了 rdd 和 包含了 com/hx/test/Test25Increment$$anonfun$main$1 的 func 

    反序列化之后 com/hx/test/Test25Increment$$anonfun$main$1 实例的 counter$1 是原来 driver program 这边的 counter 的一个深拷贝, 因此 com/hx/test/Test25Increment$$anonfun$main$1 对于 com/hx/test/Test25Increment$$anonfun$main$1的 counter$1 的改变不会影响到 driver program 这边的 counter 

    因此 最后打印出来的结果是 counter 的之前的值 2 

     

    当然这只是一个实际的例子, 上面的 Understanding closures 描述的更加抽象, 也更加专业 

     

     

    com/hx/test/Test25Increment$$anonfun$main$1 函数的调用

    我们来看一下 com/hx/test/Test25Increment$$anonfun$main$1 中的几个其他的函数 

    public final void apply(int); Code: 0: aload_0 1: iload_1 2: invokevirtual #23 // Method apply$mcVI$sp:(I)V 5: return public void apply$mcVI$sp(int); Code: 0: aload_0 1: getfield #29 // Field counter$1:Lscala/runtime/IntRef; 4: aload_0 5: getfield #29 // Field counter$1:Lscala/runtime/IntRef; 8: getfield #34 // Field scala/runtime/IntRef.elem:I 11: iload_1 12: iadd 13: putfield #34 // Field scala/runtime/IntRef.elem:I 16: return public final java.lang.Object apply(java.lang.Object); Code: 0: aload_0 1: aload_1 2: invokestatic #41 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I 5: invokevirtual #43 // Method apply:(I)V 8: getstatic #49 // Field scala/runtime/BoxedUnit.UNIT:Lscala/runtime/BoxedUnit; 11: areturn

     

    并结合我们的 com/hx/test/Test25Increment$$anonfun$main$1 调用的断点

    可以发现最先调用到的 apply 方法应该是 apply(java.lang.Object), 嵌套调用了 apply(int), 嵌套调用了 apply$mcVI$sp(int) 来进行业务的处理 

    apply(java.lang.Object) 是一个比较典型的桥接方法 

     

     

     

     

     

    Processed: 0.010, SQL: 8