封云亭 发表于 2016-12-7 06:54:48

重温hadoop(3)--序列化

  Hadoop的序列化机制特征:


[*]紧凑:带宽是hadoop集群中最稀缺的资源,一个紧凑的序列化机制可以充分利用带宽。
[*]快速:mapreduce会大量的使用序列化机制。因此,要尽可能减少序列化开销。
[*]可扩张:序列化机制需要可定制
[*]互操作:可以支持不同开发语言间的通信。
      java本身的序列化,将要序列化的类,类签名、类的所有非暂态和非静态成员的值,以及所有的父类都要写入,导致序列化的对象过于充实。可能比原来扩大了几十上百倍。
     由上面的条件,hadoop自定义了序列化机制,引入org.apache.hadoop.io.Writable

/**
* A serializable object which implements a simple, efficient, serialization
* protocol, based on {@link DataInput} and {@link DataOutput}.
*
* <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
* framework implements this interface.</p>
*
* <p>Implementations typically implement a static <code>read(DataInput)</code>
* method which constructs a new instance, calls {@link #readFields(DataInput)}
* and returns the instance.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
*   public class MyWritable implements Writable {
*       // Some data   
*       private int counter;
*       private long timestamp;
*      
*       public void write(DataOutput out) throws IOException {
*         out.writeInt(counter);
*         out.writeLong(timestamp);
*       }
*      
*       public void readFields(DataInput in) throws IOException {
*         counter = in.readInt();
*         timestamp = in.readLong();
*       }
*      
*       public static MyWritable read(DataInput in) throws IOException {
*         MyWritable w = new MyWritable();
*         w.readFields(in);
*         return w;
*       }
*   }
* </pre></blockquote></p>
*/
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
* 输出(序列化)对象到流中
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
* 从流中读取(反序列化)对象,为了效率请尽可能服用现有的对象
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
      hadoop序列化机制还包括几个重要的接口org.apache.hadoop.io.WritableComparable,org.apache.hadoop.io.WritableComparator,RawComparator
  org.apache.hadoop.io.WritableComparable:继承java.lang.Comparable,来提供类型比较。

 
  org.apache.hadoop.io.RawComparator:继承java.util.Comparator,允许去比较未被序列化为对象的记录,省去了创建对象的所有开销。
  java.util.Comparator和的java.lang.Comparable区别可见 http://lavasoft.blog.iyunv.com/62575/68380
  每个java基本类型都对应Writable封装。
  ObjectWritable可应用在需要序列化不同类型的对象到某一个字段,也可用在hadoop远程过程调用中参数的序列化和反序列化。
  ObjectWritable的实现:
  有三个成员变量,包括被封装的对象实例instance、该对象运行时类的Class对象和Configuration对象。
  ObjectWritable的write方法调用的是静态方法ObjectWritable .writeObject()该方法可以往DataOutput接口中写入各种java对象。

/** Write a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding. */
public static void writeObject(DataOutput out, Object instance,
Class declaredClass,
Configuration conf) throws IOException {
if (instance == null) {                     // null 空
instance = new NullInstance(declaredClass, conf);
declaredClass = Writable.class;
}
UTF8.writeString(out, declaredClass.getName()); // always write declared
if (declaredClass.isArray()) {                // array 数组
int length = Array.getLength(instance);
out.writeInt(length);
for (int i = 0; i < length; i++) {
writeObject(out, Array.get(instance, i),
declaredClass.getComponentType(), conf);
}
} else if (declaredClass == String.class) {   // String 字符串
UTF8.writeString(out, (String)instance);
} else if (declaredClass.isPrimitive()) {   // primitive type 基本类型
if (declaredClass == Boolean.TYPE) {      // boolean
out.writeBoolean(((Boolean)instance).booleanValue());
} else if (declaredClass == Character.TYPE) { // char
out.writeChar(((Character)instance).charValue());
} else if (declaredClass == Byte.TYPE) {    // byte
out.writeByte(((Byte)instance).byteValue());
} else if (declaredClass == Short.TYPE) {   // short
out.writeShort(((Short)instance).shortValue());
} else if (declaredClass == Integer.TYPE) { // int
out.writeInt(((Integer)instance).intValue());
} else if (declaredClass == Long.TYPE) {    // long
out.writeLong(((Long)instance).longValue());
} else if (declaredClass == Float.TYPE) {   // float
out.writeFloat(((Float)instance).floatValue());
} else if (declaredClass == Double.TYPE) {// double
out.writeDouble(((Double)instance).doubleValue());
} else if (declaredClass == Void.TYPE) {    // void
} else {
throw new IllegalArgumentException("Not a primitive: "+declaredClass);
}
} else if (declaredClass.isEnum()) {         // enum 枚举
UTF8.writeString(out, ((Enum)instance).name());
} else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable Writable的子类
UTF8.writeString(out, instance.getClass().getName());
((Writable)instance).write(out);
} else {
throw new IOException("Can't write: "+instance+" as "+declaredClass);
}
}
  和输出对应的调用的是org.apache.hadoop.io.ObjectWritable.readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
     

/** Read a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding. */
@SuppressWarnings("unchecked")
public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
throws IOException {
String className = UTF8.readString(in);
Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
if (declaredClass == null) {
try {
declaredClass = conf.getClassByName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("readObject can't find class " + className, e);
}
}   
Object instance;
if (declaredClass.isPrimitive()) {            // primitive types
if (declaredClass == Boolean.TYPE) {             // boolean
instance = Boolean.valueOf(in.readBoolean());
} else if (declaredClass == Character.TYPE) {    // char
instance = Character.valueOf(in.readChar());
} else if (declaredClass == Byte.TYPE) {         // byte
instance = Byte.valueOf(in.readByte());
} else if (declaredClass == Short.TYPE) {      // short
instance = Short.valueOf(in.readShort());
} else if (declaredClass == Integer.TYPE) {      // int
instance = Integer.valueOf(in.readInt());
} else if (declaredClass == Long.TYPE) {         // long
instance = Long.valueOf(in.readLong());
} else if (declaredClass == Float.TYPE) {      // float
instance = Float.valueOf(in.readFloat());
} else if (declaredClass == Double.TYPE) {       // double
instance = Double.valueOf(in.readDouble());
} else if (declaredClass == Void.TYPE) {         // void
instance = null;
} else {
throw new IllegalArgumentException("Not a primitive: "+declaredClass);
}
} else if (declaredClass.isArray()) {            // array
int length = in.readInt();
instance = Array.newInstance(declaredClass.getComponentType(), length);
for (int i = 0; i < length; i++) {
Array.set(instance, i, readObject(in, conf));
}
} else if (declaredClass == String.class) {      // String
instance = UTF8.readString(in);
} else if (declaredClass.isEnum()) {         // enum
instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
} else {                                    // Writable
Class instanceClass = null;
String str = "";
try {
str = UTF8.readString(in);
instanceClass = conf.getClassByName(str);
} catch (ClassNotFoundException e) {
throw new RuntimeException("readObject can't find class " + str, e);
}
//利用instanceClass创建WritableFactories
Writable writable = WritableFactories.newInstance(instanceClass, conf);
writable.readFields(in);
instance = writable;
if (instanceClass == NullInstance.class) {// null
declaredClass = ((NullInstance)instance).declaredClass;
instance = null;
}
}
if (objectWritable != null) {               // store values
objectWritable.declaredClass = declaredClass;
objectWritable.instance = instance;
}
return instance;
   

//保存了类型和WritableFactories工厂的对应关系
private static final HashMap<Class, WritableFactory> CLASS_TO_FACTORY =
new HashMap<Class, WritableFactory>();
/** Create a new instance of a class with a defined factory. */
public static Writable newInstance(Class<? extends Writable> c, Configuration conf) {
WritableFactory factory = WritableFactories.getFactory(c);
if (factory != null) {
Writable result = factory.newInstance();
if (result instanceof Configurable) {
((Configurable) result).setConf(conf);
}
return result;
} else {
//采用传统的反射工具,创建对象
return ReflectionUtils.newInstance(c, conf);
}
}
   
页: [1]
查看完整版本: 重温hadoop(3)--序列化