传统的计算机系统通过I/O操作与外界进行交流,Hadoop的I/O由传统的I/O系统发展而来,但又有些不同,Hadoop需要处理P、T级别的数据,所以在org.apache.hadoop.io包中包含了一些面向海量数据处理的基本输入输出工具。
1 序列化
对象的序列化(Serialization)用于将对象编码成一个字节流,以及从字节流中重新构建对象。“将一个对象编码成一个字节流”称为序列化该对象(Serializing);相反的处理过程称为反序列化(Deserializing)。
序列化有三种主要的用途:
- 作为一种持久化格式:一个对象被序列化以后,它的编码可以被存储到磁盘上,供以后反序列化用。
- 作为一种通信数据格式:序列化结果可以从一个正在运行的虚拟机,通过网络被传递到另一个虚拟机上。
- 作为一种拷贝、克隆(clone)机制:将对象序列化到内存的缓存区中,然后通过反序列化,可以得到一个对已存对象进行深拷贝的新对象。
在分布式数据处理中,主要使用上面提到的前两种功能:数据持久化和通信数据格式。
在分析Hadoop的序列化机制前,先介绍一下Java内建的序列化机制。
2.Java内建的序列化机制
Java序列化机制将对象转换为连续的byte数据,这些数据可以在日后还原为原先的对象状态。在Java中,使一个类的实例可被序列化非常简单,只需要在类声明中加入implements Serializable即可。Serializable接口是一个标志,不具有任何成员函数,其定义如下:
public class Object interface Serializable {
}
Serializable接口没有任何方法,所以不需要对类进行修改,Block类通过声明它实现了Serializable接口,立即可以获得Java提供的序列化功能。代码如下:
public class Block implements Writable, Comparable<Block>, Serializable
由于序列化主要应用在与I/O相关的一些操作上,其实现是通过一对输入/输出流来实现的。如果想对某个对象执行序列化动作,可以在某种OutputStream对象(后面还会讨论Java的流)的基础上创建一个对象流ObjectOutputStream对象,然后调用writeObject()就可达到目的。
writeObject()方法负责写入实现了Serializable接口对象的状态信息,输出数据将被送至该OutputStream。多个对象的序列化可以在ObjectOutputStream对象上多次调用writeObject(),分别写入这些对象。下面是序列化一个Block对象的例子:
Block block1=new Block(7806259420524417791L, 39447755L, 56736651L);
Block block2=new Block(5547099594945187683L, 67108864L, 56736828L);
..................
ByteArrayOutputStream out=new ByteArrayOutputStream();
//在ByteArrayOutputStream的基础上创建ObjectOutputStream
ObjectOutputStream objOut=new ObjectOutputStream(out);
//对block进行序列化
objOut.writeObject(block1);
对于Java基本类型的序列化,ObjectOutputStream提供了writeBoolean()、writeByte()等方法。
输入过程类似,将InputStream包装在ObjectInputStream中并调用readObject(),该方法返回一个指向向上转型后的Object的引用,通过向下转型,就可以得到正确结果。读取对象时,必须要小心地跟踪存储的对象的数量、顺序以及它们的类型。
Java的序列化机制非常“聪明”,JavaDoc中对ObjectOutputStream的writeObject()方法的说明是:“……这个对象的类、类签名、类的所有非暂态和非静态成员的值,以及它所有的父类都要被写入”,序列化机制会自动访问对象的父类,以保证对象内容的一致性。同时,序列化机制不仅存储对象在内存中的原始数据,还会追踪通过该对象可以到达的其他对象的内部数据,并描述所有这些对象是如何被链接起来的。对于复杂的情形,Java序列化机制也能应付自如:在输出objectA和objectB时,不会重复保存对象的序列化结果(如objectC,即objectC只被序列化一次);对于循环引用的对象,序列化也不会陷入死循环(如图3-1右图的情形)。
但是,序列化以后的对象在尺寸上有点过于充实了,以Block类为例,它只包含3个长整数,但是它的序列化结果竟然有112字节,而BlockMetaDataInfo其实只多了一个long型的成员变量,输出结果已经膨胀到190字节。包含3个长整数的Block对象的序列化结果如下:
AC ED 00 05 73 72 00 1C 6F 72 67 2E 68 61 64 6F ....sr.. org.hado
6F 70 69 6E 74 65 72 6E 61 6C 2E 73 65 72 2E 42 opintern al.ser.B
6C 6F 63 6B E7 80 E3 D3 A6 B6 22 53 02 00 03 4A lock.... .."S...J
00 07 62 6C 6F 63 6B 49 64 4A 00 0F 67 65 6E 65 ..blockI dJ..gene
72 61 74 69 6F 6E 53 74 61 6D 70 4A 00 08 6E 75 rationSt ampJ..nu
6D 42 79 74 65 73 78 70 6C 55 67 95 68 E7 92 FF mBytesxp lUg.h...
00 00 00 00 03 61 BB 8B 00 00 00 00 02 59 EC CB .....a.. .....Y..
仔细看Block的输出会发现,序列化的结果中包含了大量与类相关的信息。Java的序列过程在《Java Object Serialization Specification》中规范,以Block为例,其结果的前两个字节是魔数(Magic Number)“AC ED”;后续两个字节是序列化格式的版本号,现在使用的版本号是5;接下来是类的描述信息,包括类的版本ID、是否实现writeObject()和readObject()方法等信息,对于拥有超类的类(如BlockMetaDataInfo),超类的信息也会递归地被保存下来;这些信息都写入OutputStream对象后,接下来才是对象的数据。在这个过程中,序列化输出中保存了大量的附加信息,导致序列化结果膨胀,对于需要保存和处理大规模数据的Hadoop来说,需要一个新的序列化机制。
3.Hadoop序列化机制
和Java序列化机制不同(在对象流ObjectOutputStream对象上调用writeObject()方法),Hadoop的序列化机制通过调用对象的write()方法(它带有一个类型为DataOutput的参数),将对象序列化到流中。反序列化的过程也是类似,通过对象的readFields(),从流中读取数据。值得一提的是,Java序列化机制中,反序列化过程会不断地创建新的对象,但在Hadoop的序列化机制的反序列化过程中,用户可以复用对象:如,在Block的某个对象上反复调用readFields(),可以在同一个对象上得到多个反序列化的结果,而不是多个反序列化的结果对象(对象被复用了),这减少了Java对象的分配和回收,提高了应用的效率。
public static void main(String[] args) {
try {
Block block1=new Block(7806259420524417791L, 39447755L, 56736651L);
.........
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout=new DataOutputStream(bout);
block1.write(dout); //序列化对象到输出流dout中
dout.close();
System.out.println(……);
SerializationExample.print16(out.toByteArray(), bout.size());
}
........
}
由于Block对象序列化时只输出了3个长整数, block1的序列化结果一共有24字节,如下所示。和Java的序列化机制的输出结果对比,Hadoop的序列化结果紧凑而且快速。
AC ED 00 05 73 72 00 28 6F 72 67 2E 68 61 64 6F ....sr.( org.hado
6F 70 69 6E 74 65 72 6E
4.Hadoop序列化机制的特征
对于处理大规模数据的Hadoop平台,其序列化机制需要具有如下特征:
- 紧凑:由于带宽是Hadoop集群中最稀缺的资源,一个紧凑的序列化机制可以充分利用数据中心的带宽。
- 快速:在进程间通信(包括MapReduce过程中涉及的数据交互)时会大量使用序列化机制,因此,必须尽量减少序列化和反序列化的开销。
- 可扩展:随着系统的发展,系统间通信的协议会升级,类的定义会发生变化,序列化机制需要支持这些升级和变化。
- 互操作:可以支持不同开发语言间的通信,如C++和Java间的通信。这样的通信,可以通过文件(需要精心设计文件的格式)或者后面介绍的IPC机制实现。
Java的序列化机制虽然强大,却不符合上面这些要求。Java Serialization将每个对象的类名写入输出流中,这导致了Java序列化对象需要占用比原对象更多的存储空间。同时,为了减少数据量,同一个类的对象的序列化结果只输出一份元数据,并通过某种形式的引用,来共享元数据。引用导致对序列化后的流进行处理的时候,需要保持一些状态。想象如下一种场景,在一个上百G的文件中,反序列化某个对象,需要访问文件中前面的某一个元数据,这将导致这个文件不能切割,并通过MapReduce来处理。同时,Java序列化会不断地创建新的对象,对于MapReduce应用来说,不能重用对象,在已有对象上进行反序列化操作,而是不断创建反序列化的各种类型记录,这会带来大量的系统开销。
5.Hadoop Writable机制
为了支持以上这些特性,Hadoop引入org.apache.hadoop.io.Writable接口,作为所有可序列化对象必须实现的接口。Writable机制紧凑、快速(但不容易扩展到Java以外的语言,如C、Python等)。和java.io.Serializable不同,Writable接口不是一个说明性接口,它包含两个方法:
public interface Writable {
/**
* 输出(序列化)对象到流中
* @param out DataOuput流,序列化的结果保存在流中
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* 从流中读取(反序列化)对象
* 为了效率,请尽可能复用现有的对象
* @param in DataInput流,从该流中读取数据
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
Writable.write()方法用于将对象状态写入二进制的DataOutput中,反序列化的过程由readFields()从DataInput流中读取状态完成。下面是一个例子:
public class Block implements Writable, Comparable<Block>, Serializable {
......
private long blockId;
private long numBytes;
private long generationStamp;
......
public void write(DataOutput out) throws IOException {
out.writeLong(blockId);
out.writeLong(numBytes);
out.writeLong(generationStamp);
}
public void readFields(DataInput in) throws IOException {
this.blockId = in.readLong();
this.numBytes = in.readLong();
this.generationStamp = in.readLong();
if (numBytes < 0) {
throw new IOException("Unexpected block size: " + numBytes);
}
}
......
}
这个例子使用的是前面分析Java序列化机制的Block类,Block实现了Writable接口,即需要实现write()方法和readFields()方法,这两个方法的实现都很简单:Block有三个成员变量,write()方法简单地把这三个变量写入流中,而readFields()则从流中依次读入这些数据,并做必要的检查。