spark(kryo)、hadoop(writable)、jdk(serializable)-序列化

    科技2024-12-24  7

    一、SRC

    一个类在jvm中是有结构的,但即使是在jvm中,也是一堆数据。网络只能传文本,所以需要序列化和反序列化。 通过几种方式的序列化后文本输出到本地文件,可以对比下大小。

    二、jdk的序列化

    将类的上级所有层次、包名等信息都序列化,相当于把类的所有描述信息写在文本中传输。效率较低。 ObjectOutputStream底层就是调用的DataOutputStream,只不过把类的层级、包名和数据一起传了。

    所有类从技术上说都可以序列化,serializable接口是个标记接口,用来表示某个类可以被序列化。因为某些类不适合序列化,比如数据库连接对象,即使反序列化后也不能用,因为con对象依赖本地和数据库的连接。

    三、hadoop(writable)

    如果提前告知对方类的结构,则只需要传数据。效率会高。MR就是这种思路,所以MR用的是xxxWritable、Text的接口,没有用serializable,发接双方的类在driver/job的配置中都已经提高声明好了。类可以自定义,但必须实现hadoop提供的Writable接口,并且要自定义序列化和反序列化的方法。

    class TestBean extends Writable{ var id: Int = 0 var name: String = "" /* * 这个就是序列化的方法,out是输出流,因为hadoop中类的信息不需要传输,所以只需要发送属性的值 */ override def write(out: DataOutput): Unit = { out.writeInt(this.id) out.writeUTF(this.name) } /* * 这个就是反序列化的方法,in是输入流,hadoop中会先通过反射创建一个对象,然后调用这个方法来给属性赋值 */ override def readFields(in: DataInput): Unit = { this.id = in.readInt() this.name = in.readUTF() } }

    四、jdk和hadoop的序列化文件大小对比

    jdk序列化后的文件远大于haddop序列化后的文件

    import java.io.DataOutputStream; import java.io.FileOutputStream; import java.io.ObjectOutputStream; public class 序列化对比 { public static void main(String[] args) throws Exception { /* * 数据长度 = (3*2 + 2) + 4 + (6 + 2) + 8 = 28 */ Person p = new Person("张三", 36, "female", 30000.0); /* * jdk序列化 * 文件111字节,因为除了数据,还要序列化元数据 */ ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("d:/person.jdk")); os.writeObject(p); os.close(); /* * hadoop序列化 * 文件28字节 */ DataOutputStream dout = new DataOutputStream(new FileOutputStream("d:/person.hadoop")); dout.writeUTF(p.getName()); dout.writeInt(p.getAge()); dout.writeUTF(p.getSex()); dout.writeDouble(p.getSalary()); dout.close(); /* * String类型的进行序列化时,长度为字节数组的长度 + 2 * 因为前2位设置为这个String的长度,String不像int,没有固定长度,为了避免跟后面的混淆,需要定长截取 */ DataOutputStream dout2 = new DataOutputStream(new FileOutputStream("d:/x.obj")); dout2.writeUTF("aaa"); dout2.writeInt(4); } } class Person { private String name; private int age; private String sex; private double salary; public Person() { } public Person(String name, int age, String sex, double salary) { this.name = name; this.age = age; this.sex = sex; this.salary = salary; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public double getSalary() { return salary; } public void setSalary(double salary) { this.salary = salary; } }

    五、spark(kryo)

    kryo、avro都是独立的序列化框架。这2个spark都集成了。avro可以跨平台。

    1. spark默认使用jdk的序列化,显然,效率低。

    2. spark在共享变量、shuffle、cache时都需要序列化。

    Kryo序列化机制,一旦启用以后,会生效的三个地方:

    算子函数中使用到的外部变量 算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER 持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。shuffle 在进行stage间的task的shuffle操作时,节点与节点之间的task会互相大量通过网络拉取和传输文件,此时,这些数据既然通过网络传输,也是可能要序列化的,就会使用Kryo、

    3. usage

    https://blog.csdn.net/sinadrew/article/details/80457854

    val conf = new SparkConf() conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 或者 System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer")

    只需要如上设置,driver中的共享变量都会自动使用Kryo进行序列化,自然也就不会报Serializable的错。但这样还是会序列化类的元信息,会极大的增加网络IO。此时可以使用 registerKryoClasses(Array(classOf[BloomFilter],classOf[String]))来显式注册需要kryo序列化的类型,实现Class元数据的共享。

    object SparkKryo序列化 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .set("spark.serializer",classOf[KryoSerializer].getName) .registerKryoClasses(Array(classOf[BloomFilter],classOf[String])) } }

    4. 测试kryo的大小

    <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>27.0-jre</version> </dependency> <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo-shaded</artifactId> <version>4.0.2</version> </dependency> </dependencies> import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Output import org.apache.hadoop.util.bloom.BloomFilter import org.apache.spark.SparkConf import org.apache.spark.serializer.KryoSerializer object SparkKryo序列化 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .set("spark.serializer",classOf[KryoSerializer].getName) .registerKryoClasses(Array(classOf[BloomFilter],classOf[String])) /* * 数据长度 = (3*2 + 2) + 4 + (6 + 2) + 8 = 28 */ val p = new Person("张三", 36, "female", 30000.0) val kryo = new Kryo() val output = new Output(1024) /* * 类元数据和数据一起序列化 */ kryo.writeClassAndObject(output,p)// 顾名思义,把数据和类元数据一起写 val byteArr1 = output.toBytes println(byteArr1.length) // 33,带有类的描述信息 output.clear() /* * 只序列化数据 */ kryo.register(classOf[Person]) kryo.writeObject(output,p) val byteArr2 = output.toBytes println(byteArr2.length) // 25,只有数据,没有类的描述信息,比用DataOutputStream直接写数据还要短!!! output.clear() } }

    六、3种的对比

    jdk是通用的,方便,效率低

    hadoop的writable是定制的,需要导包,只能在mr体系内用。由开发者自定义序列化的细节。

    kryo也是通用的,spark默认集成,不需要导包,也可以共享类的结构信息。

    avro可以跨平台。

    Processed: 0.040, SQL: 8