设为首页 收藏本站
查看: 1169|回复: 0

[经验分享] (转)hadoop多文件格式输入

[复制链接]

尚未签到

发表于 2016-12-8 11:08:01 | 显示全部楼层 |阅读模式
  hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式
  原文:http://blog.csdn.net/fansy1990/article/details/26267637
版本:
CDH5.0.0 (hdfs:2.3,mapreduce:2.3,yarn:2.3)
hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式。
比如现在有如下的需求:
现有两份数据:
phone:
 
[plain] view plaincopy 



  • 123,good number  
  • 124,common number  
  • 125,bad number  

  user:
 
 
[plain] view plaincopy 



  • zhangsan,123  
  • lisi,124  
  • wangwu,125  

  
现在需要把user和phone按照phone number连接起来,得到下面的结果:
 
 
[plain] view plaincopy 



  • zhangsan,123,good number  
  • lisi,123,common number  
  • wangwu,125,bad number  

  那么就可以使用MultipleInputs来操作,这里把user和phone上传到hdfs目录中,分别是/multiple/user/user , /multiple/phone/phone。
 
设计的MultipleDriver如下:
 
[java] view plaincopy 



  • package multiple.input;  
  •   
  • import org.apache.hadoop.conf.Configuration;  
  • import org.apache.hadoop.conf.Configured;  
  • import org.apache.hadoop.fs.Path;  
  • import org.apache.hadoop.io.NullWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Job;  
  • import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;  
  • import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  • import org.apache.hadoop.util.Tool;  
  • import org.apache.hadoop.util.ToolRunner;  
  • //import org.slf4j.Logger;  
  • //import org.slf4j.LoggerFactory;  
  • /** 
  •  * input1(/multiple/user/user): 
  •  * username,user_phone 
  •  *   
  •  * input2(/multiple/phone/phone): 
  •  *  user_phone,description  
  •  *   
  •  * output: username,user_phone,description 
  •  *  
  •  * @author fansy 
  •  * 
  •  */  
  • public class MultipleDriver extends Configured implements Tool{  
  • //  private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);  
  •       
  •     private String input1=null;  
  •     private String input2=null;  
  •     private String output=null;  
  •     private String delimiter=null;  
  •       
  •     public static void main(String[] args) throws Exception {  
  •         Configuration conf=new Configuration();  
  • //      conf.set("fs.defaultFS", "hdfs://node33:8020");    
  • //        conf.set("mapreduce.framework.name", "yarn");    
  • //        conf.set("yarn.resourcemanager.address", "node33:8032");   
  •           
  •         ToolRunner.run(conf, new MultipleDriver(), args);  
  •     }  
  •   
  •     @Override  
  •     public int run(String[] arg0) throws Exception {  
  •         configureArgs(arg0);  
  •         checkArgs();  
  •           
  •         Configuration conf= getConf();  
  •         conf.set("delimiter", delimiter);  
  •          @SuppressWarnings("deprecation")  
  •         Job job = new Job(conf, "merge user and phone information ");  
  •         job.setJarByClass(MultipleDriver.class);  
  •   
  •         job.setReducerClass(MultipleReducer.class);  
  •         job.setMapOutputKeyClass(Text.class);  
  •         job.setMapOutputValueClass(FlagStringDataType.class);  
  •         job.setOutputKeyClass(Text.class);  
  •         job.setOutputValueClass(NullWritable.class);  
  •           
  •         job.setNumReduceTasks(1);  
  •         MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);  
  •         MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);  
  •         FileOutputFormat.setOutputPath(job, new Path(output));  
  •           
  •         int res = job.waitForCompletion(true) ? 0 : 1;  
  •         return res;  
  •     }  
  •       
  •   
  •     /** 
  •      * check the args  
  •      */  
  •     private void checkArgs() {  
  •         if(input1==null||"".equals(input1)){  
  •             System.out.println("no user input...");  
  •             printUsage();  
  •             System.exit(-1);  
  •         }  
  •         if(input2==null||"".equals(input2)){  
  •             System.out.println("no phone input...");  
  •             printUsage();  
  •             System.exit(-1);  
  •         }  
  •         if(output==null||"".equals(output)){  
  •             System.out.println("no output...");  
  •             printUsage();  
  •             System.exit(-1);  
  •         }  
  •         if(delimiter==null||"".equals(delimiter)){  
  •             System.out.println("no delimiter...");  
  •             printUsage();  
  •             System.exit(-1);  
  •         }  
  •       
  •     }  
  •   
  •     /** 
  •      * configuration the args 
  •      * @param args 
  •      */  
  •     private void configureArgs(String[] args) {  
  •         for(int i=0;i<args.length;i++){  
  •             if("-i1".equals(args)){  
  •                 input1=args[++i];  
  •             }  
  •             if("-i2".equals(args)){  
  •                 input2=args[++i];  
  •             }  
  •               
  •             if("-o".equals(args)){  
  •                 output=args[++i];  
  •             }  
  •               
  •             if("-delimiter".equals(args)){  
  •                 delimiter=args[++i];  
  •             }  
  •               
  •         }  
  •     }  
  •     public static void printUsage(){  
  •         System.err.println("Usage:");  
  •         System.err.println("-i1 input \t user data path.");  
  •         System.err.println("-i2 input \t phone data path.");  
  •         System.err.println("-o output \t output data path.");  
  •         System.err.println("-delimiter  data delimiter , default is comma  .");  
  •     }  
  • }  

  
这里指定两个mapper和一个reducer,两个mapper分别对应处理user和phone的数据,分别如下:
 
mapper1(处理user数据):
 
[java] view plaincopy 



  • package multiple.input;  
  •   
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.io.LongWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Mapper;  
  • import org.slf4j.Logger;  
  • import org.slf4j.LoggerFactory;  
  • /** 
  •  * input : 
  •  * username,phone 
  •  *  
  •  * output: 
  •  * <key,value>  --> <[phone],[0,username]> 
  •  * @author fansy 
  •  * 
  •  */  
  • public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{  
  •     private  Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);  
  •     private String delimiter=null// default is comma  
  •     @Override  
  •     public void setup(Context cxt){  
  •         delimiter= cxt.getConfiguration().get("delimiter"",");  
  •         log.info("This is the begin of Multiple1Mapper");  
  •     }   
  •       
  •     @Override  
  •     public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{  
  •         String info= new String(value.getBytes(),"UTF-8");  
  •         String[] values = info.split(delimiter);  
  •         if(values.length!=2){  
  •             return;  
  •         }  
  •         log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]");  
  •         cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));  
  •     }  
  • }  

  
mapper2(处理phone数据):
 
 
[java] view plaincopy 



  • package multiple.input;  
  •   
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.io.LongWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Mapper;  
  • import org.slf4j.Logger;  
  • import org.slf4j.LoggerFactory;  
  • /** 
  •  * input : 
  •  * phone,description 
  •  *  
  •  * output: 
  •  * <key,value>  --> <[phone],[1,description]> 
  •  * @author fansy 
  •  * 
  •  */  
  • public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{  
  •     private  Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);  
  •     private String delimiter=null// default is comma  
  •     @Override  
  •     public void setup(Context cxt){  
  •         delimiter= cxt.getConfiguration().get("delimiter"",");  
  •         log.info("This is the begin of Multiple2Mapper");  
  •     }   
  •       
  •     @Override  
  •     public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{  
  •         String[] values= value.toString().split(delimiter);  
  •         if(values.length!=2){  
  •             return;  
  •         }  
  •         log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]");  
  •         cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));  
  •     }  
  • }  

  这里的FlagStringDataType是自定义的:
 
 
[java] view plaincopy 



  • package multiple.input;  
  •   
  • import java.io.DataInput;  
  • import java.io.DataOutput;  
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.io.WritableComparable;  
  • import org.slf4j.Logger;  
  • import org.slf4j.LoggerFactory;  
  •   
  • import com.google.common.primitives.Ints;  
  •   
  • public class FlagStringDataType implements WritableComparable<FlagStringDataType> {  
  •     private  Logger log = LoggerFactory.getLogger(FlagStringDataType.class);  
  •   private String value;  
  •   private int flag;  
  •   public FlagStringDataType() {  
  •   }  
  •   
  •   public FlagStringDataType(int flag,String value) {  
  •     this.value = value;  
  •     this.flag=flag;  
  •   }  
  •   
  •   public String get() {  
  •     return value;  
  •   }  
  •   
  •   public void set(String value) {  
  •     this.value = value;  
  •   }  
  •   
  •   @Override  
  •   public boolean equals(Object other) {  
  •     return other != null && getClass().equals(other.getClass())   
  •             && ((FlagStringDataType) other).get() == value  
  •             &&((FlagStringDataType) other).getFlag()==flag;  
  •   }  
  •   
  •   @Override  
  •   public int hashCode() {  
  •     return Ints.hashCode(flag)+value.hashCode();  
  •   }  
  •   
  •   @Override  
  •   public int compareTo(FlagStringDataType other) {  
  •        
  •     if (flag >= other.flag) {  
  •       if (flag > other.flag) {  
  •         return 1;  
  •       }  
  •     } else {  
  •       return -1;  
  •     }  
  •     return value.compareTo(other.value);  
  •   }  
  •   
  •   @Override  
  •   public void write(DataOutput out) throws IOException {  
  •     log.info("in write()::"+"flag:"+flag+",vlaue:"+value);  
  •     out.writeInt(flag);  
  •     out.writeUTF(value);  
  •   }  
  •   
  •   @Override  
  •   public void readFields(DataInput in) throws IOException {  
  •       log.info("in read()::"+"flag:"+flag+",vlaue:"+value);  
  •       flag=in.readInt();  
  •       value = in.readUTF();  
  •       log.info("in read()::"+"flag:"+flag+",vlaue:"+value);  
  •   }  
  •   
  • public int getFlag() {  
  •     return flag;  
  • }  
  •   
  • public void setFlag(int flag) {  
  •     this.flag = flag;  
  • }  
  •   
  • public String toString(){  
  •     return flag+":"+value;  
  • }  
  •   
  • }  

  
这个自定义类,使用一个flag来指定是哪个数据,而value则对应是其值。这样做的好处是在reduce端可以根据flag的值来判断其输出位置,这种设计方式可以对多种输入的整合有很大帮助,在mahout中也可以看到这样的设计。
 
reducer(汇总输出数据):
 
[java] view plaincopy 



  • package multiple.input;  
  •   
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.io.NullWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Reducer;  
  • import org.slf4j.Logger;  
  • import org.slf4j.LoggerFactory;  
  •   
  • public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{  
  •     private  Logger log = LoggerFactory.getLogger(MultipleReducer.class);  
  •     private String delimiter=null// default is comma  
  •     @Override  
  •     public void setup(Context cxt){  
  •         delimiter= cxt.getConfiguration().get("delimiter"",");  
  •     }   
  •     @Override  
  •     public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{  
  •         log.info("================");  
  •         log.info("         =======");  
  •         log.info("              ==");  
  •         String[] value= new String[3];  
  •         value[2]=key.toString();  
  •         for(FlagStringDataType v:values){  
  •             int index= v.getFlag();  
  •             log.info("index:"+index+"-->value:"+v.get());  
  •             value[index]= v.get();  
  •         }  
  •         log.info("              ==");  
  •         log.info("         =======");  
  •         log.info("================");  
  •         cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());  
  •     }  
  • }  

  
这样设计的好处是,可以针对不同的输入数据采取不同的逻辑处理,而且不同的输入数据可以是序列文件的格式。
 
下面介绍一种方式和上面的比,略有不足,但是可以借鉴。
首先是Driver:
 
[java] view plaincopy 



  • package multiple.input;  
  •   
  • import org.apache.hadoop.conf.Configuration;  
  • import org.apache.hadoop.conf.Configured;  
  • import org.apache.hadoop.fs.Path;  
  • import org.apache.hadoop.io.NullWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Job;  
  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  • import org.apache.hadoop.util.Tool;  
  • import org.apache.hadoop.util.ToolRunner;  
  • //import org.slf4j.Logger;  
  • //import org.slf4j.LoggerFactory;  
  • /** 
  •  * input1(/multiple/user/user): 
  •  * username,user_phone 
  •  *   
  •  * input2(/multiple/phone/phone): 
  •  *  user_phone,description  
  •  *   
  •  * output: username,user_phone,description 
  •  *  
  •  * @author fansy 
  •  * 
  •  */  
  • public class MultipleDriver2 extends Configured implements Tool{  
  • //  private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);  
  •       
  •     private String input1=null;  
  •     private String input2=null;  
  •     private String output=null;  
  •     private String delimiter=null;  
  •       
  •     public static void main(String[] args) throws Exception {  
  •         Configuration conf=new Configuration();  
  • //      conf.set("fs.defaultFS", "hdfs://node33:8020");    
  • //        conf.set("mapreduce.framework.name", "yarn");    
  • //        conf.set("yarn.resourcemanager.address", "node33:8032");   
  •           
  •         ToolRunner.run(conf, new MultipleDriver2(), args);  
  •     }  
  •   
  •     @Override  
  •     public int run(String[] arg0) throws Exception {  
  •         configureArgs(arg0);  
  •         checkArgs();  
  •           
  •         Configuration conf= getConf();  
  •         conf.set("delimiter", delimiter);  
  •          @SuppressWarnings("deprecation")  
  •         Job job = new Job(conf, "merge user and phone information ");  
  •         job.setJarByClass(MultipleDriver2.class);  
  •         job.setMapperClass(MultipleMapper.class);  
  •         job.setReducerClass(MultipleReducer.class);  
  •         job.setMapOutputKeyClass(Text.class);  
  •         job.setMapOutputValueClass(FlagStringDataType.class);  
  •         job.setOutputKeyClass(Text.class);  
  •         job.setOutputValueClass(NullWritable.class);  
  •           
  •         job.setNumReduceTasks(1);  
  •         FileInputFormat.addInputPath(job, new Path(input1));  
  •         FileInputFormat.addInputPath(job, new Path(input2));  
  •         FileOutputFormat.setOutputPath(job, new Path(output));  
  •           
  •         int res = job.waitForCompletion(true) ? 0 : 1;  
  •         return res;  
  •     }  
  •       
  •   
  •     /** 
  •      * check the args  
  •      */  
  •     private void checkArgs() {  
  •         if(input1==null||"".equals(input1)){  
  •             System.out.println("no user input...");  
  •             printUsage();  
  •             System.exit(-1);  
  •         }  
  •         if(input2==null||"".equals(input2)){  
  •             System.out.println("no phone input...");  
  •             printUsage();  
  •             System.exit(-1);  
  •         }  
  •         if(output==null||"".equals(output)){  
  •             System.out.println("no output...");  
  •             printUsage();  
  •             System.exit(-1);  
  •         }  
  •         if(delimiter==null||"".equals(delimiter)){  
  •             System.out.println("no delimiter...");  
  •             printUsage();  
  •             System.exit(-1);  
  •         }  
  •       
  •     }  
  •   
  •     /** 
  •      * configuration the args 
  •      * @param args 
  •      */  
  •     private void configureArgs(String[] args) {  
  •         for(int i=0;i<args.length;i++){  
  •             if("-i1".equals(args)){  
  •                 input1=args[++i];  
  •             }  
  •             if("-i2".equals(args)){  
  •                 input2=args[++i];  
  •             }  
  •               
  •             if("-o".equals(args)){  
  •                 output=args[++i];  
  •             }  
  •               
  •             if("-delimiter".equals(args)){  
  •                 delimiter=args[++i];  
  •             }  
  •               
  •         }  
  •     }  
  •     public static void printUsage(){  
  •         System.err.println("Usage:");  
  •         System.err.println("-i1 input \t user data path.");  
  •         System.err.println("-i2 input \t phone data path.");  
  •         System.err.println("-o output \t output data path.");  
  •         System.err.println("-delimiter  data delimiter , default is comma  .");  
  •     }  
  • }  

  
这里添加路径直接使用FileInputFormat添加输入路径,这样的话,针对不同的输入数据的不同业务逻辑可以在mapper中先判断目前正在处理的是那个数据,然后根据其路径来进行相应的业务逻辑处理:
 
 
[java] view plaincopy 



  • package multiple.input;  
  •   
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.io.LongWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.InputSplit;  
  • import org.apache.hadoop.mapreduce.Mapper;  
  • import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  • /** 
  •  * input1 : 
  •  * username,phone 
  •  *  
  •  * input2 
  •  * phone,description 
  •  *  
  •  * output: 
  •  * <key,value>  --> <[phone],[0,username]> 
  •  * <key,value>  --> <[phone],[1,description]> 
  •  * @author fansy 
  •  * 
  •  */  
  • public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{  
  •       
  •     private String delimiter=null// default is comma  
  •     private boolean flag=false;  
  •     @Override  
  •     public void setup(Context cxt){  
  •         delimiter= cxt.getConfiguration().get("delimiter"",");  
  •         InputSplit input=cxt.getInputSplit();    
  •         String filename=((FileSplit) input).getPath().getParent().getName();  
  •         if("user".equals(filename)){  
  •             flag=true;  
  •         }  
  •     }   
  •       
  •     @Override  
  •     public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{  
  •         String[] values= value.toString().split(delimiter);  
  •         if(values.length!=2){  
  •             return;  
  •         }  
  •         if(flag){  
  •             cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));  
  •         }else{  
  •             cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));  
  •         }  
  •     }  
  • }  

  
总体来说,这种处理方式其实是不如第一种的,在每个map函数中都需要进行判断,比第一种多了很多操作;同时,针对不同的序列文件,这种方式处理不了(Key、value的类型不一样的情况下)。所以针对多文件格式的输入,最好还是使用第一种方式。
 
 
 
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311447-1-1.html 上篇帖子: hadoop学习5——从start-all.sh入手调试源码 下篇帖子: zz:解决Window环境下启动Hadoop时出现 java.lang.NoClassDefFoundError: org/apache/hadoop/util
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表