设为首页 收藏本站
查看: 599|回复: 0

[经验分享] hadoop 如何自定义类型

[复制链接]

尚未签到

发表于 2016-12-8 07:43:38 | 显示全部楼层 |阅读模式
记录一下hadoop 数据类型章节的笔记,以便后期使用,本文是边学习边记录,持续更新中
Hadoop 常用自带的数据类型和Java数据类型配比如下
Hadoop类型Java类型描述
BooleanWritableboolean布尔型
IntWritableint整型
FloatWritablefloat浮点float
DoubleWritabledouble浮点型double
ByteWritablebyte整数类型byte
TextString字符串型
ArrayWritableArray数组型

[/table]
在此首先明确定义下序列化
参考百度百科
序列化 (Serialization)将对象的状态信息转换为可以存储或传输的形式的过程。在序列化期间,对象将其当前状态写入到临时或持久性存储区。以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象。

Hadoop自定义类型必须实现的一个接口 Writable 代码如下
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}

write 方法:Serialize the fields of this object to out
readFields:Deserialize the fields of this object from in
实现该接口后,还需要手动实现一个静态方法,在该方法中返回自定义类型的无参构造方法
for example
public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
w.readFields(in);
return w;
}


官方完成例子

public class MyWritable implements Writable {
// Some data     
private int counter;
private long timestamp;
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}
public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
w.readFields(in);
return w;
}
}


WritableComparables can be compared to each other, typically via Comparators. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface.

如果该自定义类型作为key,那么需要实现 WritableComparable 接口,这个接口实现了两个接口 ,分别为 Comparable<T>, Writable
类似上一段代码 主要新增 compareTo 方法 代码如下
  public int compareTo(MyWritableComparable w) {
int thisValue = this.value;
int thatValue = ((IntWritable)o).value;
return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}


特殊的类型 NullWritable
NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。
特殊的类型 ObjectWritable
ObjectWritable 是对java 基本类型的一个通用封装:用于客户端与服务器间传输的Writable对象,也是对RPC传输对象的封装,因为RPC上交换的信息只能是JAVA的基础数据类型,String或者Writable类型,而ObjectWritable是对其子类的抽象封装
ObjectWritable会往流里写入如下信息:
对象类名,对象自己的串行化结果
其序列化和反序列化方法如下:
public void readFields(DataInput in) throws IOException {
readObject(in, this, this.conf);
}
public void write(DataOutput out) throws IOException {
writeObject(out, instance, declaredClass, conf);
}
public static void writeObject(DataOutput out, Object instance,
Class declaredClass,
Configuration conf) throws IOException {
//对象为空则抽象出内嵌数据类型NullInstance
if (instance == null) {                       // null
instance = new NullInstance(declaredClass, conf);
declaredClass = Writable.class;
}
//先写入类名
UTF8.writeString(out, declaredClass.getName()); // always write declared
/*
* 封装的对象为数组类型,则逐个序列化(序列化为length+对象的序列化内容)
* 采用了迭代
*/
if (declaredClass.isArray()) {                // array
int length = Array.getLength(instance);
out.writeInt(length);
for (int i = 0; i < length; i++) {
writeObject(out, Array.get(instance, i),
declaredClass.getComponentType(), conf);
}
//为String类型直接写入
} else if (declaredClass == String.class) {   // String
UTF8.writeString(out, (String)instance);
}//基本数据类型写入
else if (declaredClass.isPrimitive()) {     // primitive type
if (declaredClass == Boolean.TYPE) {        // boolean
out.writeBoolean(((Boolean)instance).booleanValue());
} else if (declaredClass == Character.TYPE) { // char
out.writeChar(((Character)instance).charValue());
} else if (declaredClass == Byte.TYPE) {    // byte
out.writeByte(((Byte)instance).byteValue());
} else if (declaredClass == Short.TYPE) {   // short
out.writeShort(((Short)instance).shortValue());
} else if (declaredClass == Integer.TYPE) { // int
out.writeInt(((Integer)instance).intValue());
} else if (declaredClass == Long.TYPE) {    // long
out.writeLong(((Long)instance).longValue());
} else if (declaredClass == Float.TYPE) {   // float
out.writeFloat(((Float)instance).floatValue());
} else if (declaredClass == Double.TYPE) {  // double
out.writeDouble(((Double)instance).doubleValue());
} else if (declaredClass == Void.TYPE) {    // void
} else {
throw new IllegalArgumentException("Not a primitive: "+declaredClass);
}
//枚举类型写入
} else if (declaredClass.isEnum()) {         // enum
UTF8.writeString(out, ((Enum)instance).name());
//hadoop的Writable类型写入
} else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
UTF8.writeString(out, instance.getClass().getName());
((Writable)instance).write(out);
} else {
throw new IOException("Can't write: "+instance+" as "+declaredClass);
}
}
public static Object readObject(DataInput in, Configuration conf)
throws IOException {
return readObject(in, null, conf);
}
/** Read a {<a href="http://my.oschina.net/link1212" class="referer" target="_blank">@link</a> Writable}, {<a href="http://my.oschina.net/link1212" class="referer" target="_blank">@link</a> String}, primitive type, or an array of
* the preceding. */
@SuppressWarnings("unchecked")
public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
throws IOException {
//获取反序列化的名字
String className = UTF8.readString(in);
//假设为基本数据类型
Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
/*
* 判断是否为基本数据类型,不是则为空,则为Writable类型,
* 对于Writable类型从Conf配置文件中读取类名,
* 在这里只是获取类名,而并没有反序列化对象
*/
if (declaredClass == null) {
try {
declaredClass = conf.getClassByName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("readObject can't find class " + className, e);
}
}   
//基本数据类型
Object instance;
//为基本数据类型,逐一反序列化
if (declaredClass.isPrimitive()) {            // primitive types
if (declaredClass == Boolean.TYPE) {             // boolean
instance = Boolean.valueOf(in.readBoolean());
} else if (declaredClass == Character.TYPE) {    // char
instance = Character.valueOf(in.readChar());
} else if (declaredClass == Byte.TYPE) {         // byte
instance = Byte.valueOf(in.readByte());
} else if (declaredClass == Short.TYPE) {        // short
instance = Short.valueOf(in.readShort());
} else if (declaredClass == Integer.TYPE) {      // int
instance = Integer.valueOf(in.readInt());
} else if (declaredClass == Long.TYPE) {         // long
instance = Long.valueOf(in.readLong());
} else if (declaredClass == Float.TYPE) {        // float
instance = Float.valueOf(in.readFloat());
} else if (declaredClass == Double.TYPE) {       // double
instance = Double.valueOf(in.readDouble());
} else if (declaredClass == Void.TYPE) {         // void
instance = null;
} else {
throw new IllegalArgumentException("Not a primitive: "+declaredClass);
}
} else if (declaredClass.isArray()) {              // array
int length = in.readInt();
instance = Array.newInstance(declaredClass.getComponentType(), length);
for (int i = 0; i < length; i++) {
Array.set(instance, i, readObject(in, conf));
}
} else if (declaredClass == String.class) {        // String类型的反序列化
instance = UTF8.readString(in);
} else if (declaredClass.isEnum()) {         // enum的反序列化
instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
} else {                                      // Writable
Class instanceClass = null;
String str = "";
try {
//剩下的从Conf对象中获取类型Class
str = UTF8.readString(in);
instanceClass = conf.getClassByName(str);
} catch (ClassNotFoundException e) {
throw new RuntimeException("readObject can't find class " + str, e);
}
/*
* 带用了WritableFactories工厂去new instanceClass(实现了Writable接口)对象出来
* 在调用实现Writable对象自身的反序列化方法
*/
Writable writable = WritableFactories.newInstance(instanceClass, conf);
writable.readFields(in);
instance = writable;
if (instanceClass == NullInstance.class) {  // null
declaredClass = ((NullInstance)instance).declaredClass;
instance = null;
}
}
//最后存储反序列化后待封装的ObjectWritable对象
if (objectWritable != null) {                 // store values
objectWritable.declaredClass = declaredClass;
objectWritable.instance = instance;
}
return instance;
}


特殊的类型 GenericWritable
例如一个reduce中的输入从多个map中获,然而各个map的输出value类型都不同,这就需要 GenericWritable 类型  map端用法如下
context.write(new Text(str), new MyGenericWritable(new LongWritable(1)));
context.write(new Text(str), new MyGenericWritable(new Text("1")));

在reduce 中用法如下

for (MyGenericWritable time : values){  
//获取MyGenericWritable对象  
Writable writable = time.get();  
//如果当前是LongWritable类型  
if (writable instanceof LongWritable){  
count += ((LongWritable) writable).get();  
}  
//如果当前是Text类型  
if (writable instanceof Text){  
count += Long.parseLong(((Text)writable).toString());  
}  
}  


自定义MyGenericWritable如下

class MyGenericWritable extends GenericWritable{  
//无参构造函数  
public MyGenericWritable() {  
}  
//有参构造函数  
public MyGenericWritable(Text text) {  
super.set(text);  
}  
//有参构造函数  
public MyGenericWritable(LongWritable longWritable) {  
super.set(longWritable);  
}  

@Override  
protected Class<? extends Writable>[] getTypes() {  
return new Class[]{LongWritable.class,Text.class};  
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311111-1-1.html 上篇帖子: Hadoop MapReduce 初步学习总结 下篇帖子: 用 Hadoop 进行分布式并行编程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表