一个类在jvm中是有结构的,但即使是在jvm中,也是一堆数据。网络只能传文本,所以需要序列化和反序列化。 通过几种方式的序列化后文本输出到本地文件,可以对比下大小。
将类的上级所有层次、包名等信息都序列化,相当于把类的所有描述信息写在文本中传输。效率较低。 ObjectOutputStream底层就是调用的DataOutputStream,只不过把类的层级、包名和数据一起传了。
所有类从技术上说都可以序列化,serializable接口是个标记接口,用来表示某个类可以被序列化。因为某些类不适合序列化,比如数据库连接对象,即使反序列化后也不能用,因为con对象依赖本地和数据库的连接。
如果提前告知对方类的结构,则只需要传数据。效率会高。MR就是这种思路,所以MR用的是xxxWritable、Text的接口,没有用serializable,发接双方的类在driver/job的配置中都已经提高声明好了。类可以自定义,但必须实现hadoop提供的Writable接口,并且要自定义序列化和反序列化的方法。
class TestBean extends Writable{ var id: Int = 0 var name: String = "" /* * 这个就是序列化的方法,out是输出流,因为hadoop中类的信息不需要传输,所以只需要发送属性的值 */ override def write(out: DataOutput): Unit = { out.writeInt(this.id) out.writeUTF(this.name) } /* * 这个就是反序列化的方法,in是输入流,hadoop中会先通过反射创建一个对象,然后调用这个方法来给属性赋值 */ override def readFields(in: DataInput): Unit = { this.id = in.readInt() this.name = in.readUTF() } }jdk序列化后的文件远大于haddop序列化后的文件
import java.io.DataOutputStream; import java.io.FileOutputStream; import java.io.ObjectOutputStream; public class 序列化对比 { public static void main(String[] args) throws Exception { /* * 数据长度 = (3*2 + 2) + 4 + (6 + 2) + 8 = 28 */ Person p = new Person("张三", 36, "female", 30000.0); /* * jdk序列化 * 文件111字节,因为除了数据,还要序列化元数据 */ ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("d:/person.jdk")); os.writeObject(p); os.close(); /* * hadoop序列化 * 文件28字节 */ DataOutputStream dout = new DataOutputStream(new FileOutputStream("d:/person.hadoop")); dout.writeUTF(p.getName()); dout.writeInt(p.getAge()); dout.writeUTF(p.getSex()); dout.writeDouble(p.getSalary()); dout.close(); /* * String类型的进行序列化时,长度为字节数组的长度 + 2 * 因为前2位设置为这个String的长度,String不像int,没有固定长度,为了避免跟后面的混淆,需要定长截取 */ DataOutputStream dout2 = new DataOutputStream(new FileOutputStream("d:/x.obj")); dout2.writeUTF("aaa"); dout2.writeInt(4); } } class Person { private String name; private int age; private String sex; private double salary; public Person() { } public Person(String name, int age, String sex, double salary) { this.name = name; this.age = age; this.sex = sex; this.salary = salary; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public double getSalary() { return salary; } public void setSalary(double salary) { this.salary = salary; } }kryo、avro都是独立的序列化框架。这2个spark都集成了。avro可以跨平台。
Kryo序列化机制,一旦启用以后,会生效的三个地方:
算子函数中使用到的外部变量 算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER 持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。shuffle 在进行stage间的task的shuffle操作时,节点与节点之间的task会互相大量通过网络拉取和传输文件,此时,这些数据既然通过网络传输,也是可能要序列化的,就会使用Kryo、https://blog.csdn.net/sinadrew/article/details/80457854
val conf = new SparkConf() conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 或者 System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer")只需要如上设置,driver中的共享变量都会自动使用Kryo进行序列化,自然也就不会报Serializable的错。但这样还是会序列化类的元信息,会极大的增加网络IO。此时可以使用 registerKryoClasses(Array(classOf[BloomFilter],classOf[String]))来显式注册需要kryo序列化的类型,实现Class元数据的共享。
object SparkKryo序列化 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .set("spark.serializer",classOf[KryoSerializer].getName) .registerKryoClasses(Array(classOf[BloomFilter],classOf[String])) } }