xuxiaohui9216 发表于 2016-12-8 11:08:01

(转)hadoop多文件格式输入

  hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式
  原文:http://blog.csdn.net/fansy1990/article/details/26267637
版本:
CDH5.0.0 (hdfs:2.3,mapreduce:2.3,yarn:2.3)
hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式。
比如现在有如下的需求:
现有两份数据:
phone:
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]123,good number  
[*]124,common number  
[*]125,bad number  

  user:
 
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]zhangsan,123  
[*]lisi,124  
[*]wangwu,125  

  
现在需要把user和phone按照phone number连接起来,得到下面的结果:
 
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]zhangsan,123,good number  
[*]lisi,123,common number  
[*]wangwu,125,bad number  

  那么就可以使用MultipleInputs来操作,这里把user和phone上传到hdfs目录中,分别是/multiple/user/user , /multiple/phone/phone。
 
设计的MultipleDriver如下:
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]package multiple.input;  
[*]  
[*]import org.apache.hadoop.conf.Configuration;  
[*]import org.apache.hadoop.conf.Configured;  
[*]import org.apache.hadoop.fs.Path;  
[*]import org.apache.hadoop.io.NullWritable;  
[*]import org.apache.hadoop.io.Text;  
[*]import org.apache.hadoop.mapreduce.Job;  
[*]import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;  
[*]import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
[*]import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
[*]import org.apache.hadoop.util.Tool;  
[*]import org.apache.hadoop.util.ToolRunner;  
[*]//import org.slf4j.Logger;  
[*]//import org.slf4j.LoggerFactory;  
[*]/** 
[*] * input1(/multiple/user/user): 
[*] * username,user_phone 
[*] *   
[*] * input2(/multiple/phone/phone): 
[*] *  user_phone,description  
[*] *   
[*] * output: username,user_phone,description 
[*] *  
[*] * @author fansy 
[*] * 
[*] */  
[*]public class MultipleDriver extends Configured implements Tool{  
[*]//  private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);  
[*]      
[*]    private String input1=null;  
[*]    private String input2=null;  
[*]    private String output=null;  
[*]    private String delimiter=null;  
[*]      
[*]    public static void main(String[] args) throws Exception {  
[*]        Configuration conf=new Configuration();  
[*]//      conf.set("fs.defaultFS", "hdfs://node33:8020");    
[*]//        conf.set("mapreduce.framework.name", "yarn");    
[*]//        conf.set("yarn.resourcemanager.address", "node33:8032");   
[*]          
[*]        ToolRunner.run(conf, new MultipleDriver(), args);  
[*]    }  
[*]  
[*]    @Override  
[*]    public int run(String[] arg0) throws Exception {  
[*]        configureArgs(arg0);  
[*]        checkArgs();  
[*]          
[*]        Configuration conf= getConf();  
[*]        conf.set("delimiter", delimiter);  
[*]         @SuppressWarnings("deprecation")  
[*]        Job job = new Job(conf, "merge user and phone information ");  
[*]        job.setJarByClass(MultipleDriver.class);  
[*]  
[*]        job.setReducerClass(MultipleReducer.class);  
[*]        job.setMapOutputKeyClass(Text.class);  
[*]        job.setMapOutputValueClass(FlagStringDataType.class);  
[*]        job.setOutputKeyClass(Text.class);  
[*]        job.setOutputValueClass(NullWritable.class);  
[*]          
[*]        job.setNumReduceTasks(1);  
[*]        MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);  
[*]        MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);  
[*]        FileOutputFormat.setOutputPath(job, new Path(output));  
[*]          
[*]        int res = job.waitForCompletion(true) ? 0 : 1;  
[*]        return res;  
[*]    }  
[*]      
[*]  
[*]    /** 
[*]     * check the args  
[*]     */  
[*]    private void checkArgs() {  
[*]        if(input1==null||"".equals(input1)){  
[*]            System.out.println("no user input...");  
[*]            printUsage();  
[*]            System.exit(-1);  
[*]        }  
[*]        if(input2==null||"".equals(input2)){  
[*]            System.out.println("no phone input...");  
[*]            printUsage();  
[*]            System.exit(-1);  
[*]        }  
[*]        if(output==null||"".equals(output)){  
[*]            System.out.println("no output...");  
[*]            printUsage();  
[*]            System.exit(-1);  
[*]        }  
[*]        if(delimiter==null||"".equals(delimiter)){  
[*]            System.out.println("no delimiter...");  
[*]            printUsage();  
[*]            System.exit(-1);  
[*]        }  
[*]      
[*]    }  
[*]  
[*]    /** 
[*]     * configuration the args 
[*]     * @param args 
[*]     */  
[*]    private void configureArgs(String[] args) {  
[*]        for(int i=0;i<args.length;i++){  
[*]            if("-i1".equals(args)){  
[*]                input1=args[++i];  
[*]            }  
[*]            if("-i2".equals(args)){  
[*]                input2=args[++i];  
[*]            }  
[*]              
[*]            if("-o".equals(args)){  
[*]                output=args[++i];  
[*]            }  
[*]              
[*]            if("-delimiter".equals(args)){  
[*]                delimiter=args[++i];  
[*]            }  
[*]              
[*]        }  
[*]    }  
[*]    public static void printUsage(){  
[*]        System.err.println("Usage:");  
[*]        System.err.println("-i1 input \t user data path.");  
[*]        System.err.println("-i2 input \t phone data path.");  
[*]        System.err.println("-o output \t output data path.");  
[*]        System.err.println("-delimiter  data delimiter , default is comma  .");  
[*]    }  
[*]}  

  
这里指定两个mapper和一个reducer,两个mapper分别对应处理user和phone的数据,分别如下:
 
mapper1(处理user数据):
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]package multiple.input;  
[*]  
[*]import java.io.IOException;  
[*]  
[*]import org.apache.hadoop.io.LongWritable;  
[*]import org.apache.hadoop.io.Text;  
[*]import org.apache.hadoop.mapreduce.Mapper;  
[*]import org.slf4j.Logger;  
[*]import org.slf4j.LoggerFactory;  
[*]/** 
[*] * input : 
[*] * username,phone 
[*] *  
[*] * output: 
[*] * <key,value>  --> <,> 
[*] * @author fansy 
[*] * 
[*] */  
[*]public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{  
[*]    private  Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);  
[*]    private String delimiter=null; // default is comma  
[*]    @Override  
[*]    public void setup(Context cxt){  
[*]        delimiter= cxt.getConfiguration().get("delimiter", ",");  
[*]        log.info("This is the begin of Multiple1Mapper");  
[*]    }   
[*]      
[*]    @Override  
[*]    public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{  
[*]        String info= new String(value.getBytes(),"UTF-8");  
[*]        String[] values = info.split(delimiter);  
[*]        if(values.length!=2){  
[*]            return;  
[*]        }  
[*]        log.info("key-->"+values[1]+"=========value-->"+"+values[0]+"]");  
[*]        cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));  
[*]    }  
[*]}  

  
mapper2(处理phone数据):
 
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]package multiple.input;  
[*]  
[*]import java.io.IOException;  
[*]  
[*]import org.apache.hadoop.io.LongWritable;  
[*]import org.apache.hadoop.io.Text;  
[*]import org.apache.hadoop.mapreduce.Mapper;  
[*]import org.slf4j.Logger;  
[*]import org.slf4j.LoggerFactory;  
[*]/** 
[*] * input : 
[*] * phone,description 
[*] *  
[*] * output: 
[*] * <key,value>  --> <,> 
[*] * @author fansy 
[*] * 
[*] */  
[*]public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{  
[*]    private  Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);  
[*]    private String delimiter=null; // default is comma  
[*]    @Override  
[*]    public void setup(Context cxt){  
[*]        delimiter= cxt.getConfiguration().get("delimiter", ",");  
[*]        log.info("This is the begin of Multiple2Mapper");  
[*]    }   
[*]      
[*]    @Override  
[*]    public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{  
[*]        String[] values= value.toString().split(delimiter);  
[*]        if(values.length!=2){  
[*]            return;  
[*]        }  
[*]        log.info("key-->"+values[0]+"=========value-->"+"+values[1]+"]");  
[*]        cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));  
[*]    }  
[*]}  

  这里的FlagStringDataType是自定义的:
 
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]package multiple.input;  
[*]  
[*]import java.io.DataInput;  
[*]import java.io.DataOutput;  
[*]import java.io.IOException;  
[*]  
[*]import org.apache.hadoop.io.WritableComparable;  
[*]import org.slf4j.Logger;  
[*]import org.slf4j.LoggerFactory;  
[*]  
[*]import com.google.common.primitives.Ints;  
[*]  
[*]public class FlagStringDataType implements WritableComparable<FlagStringDataType> {  
[*]    private  Logger log = LoggerFactory.getLogger(FlagStringDataType.class);  
[*]  private String value;  
[*]  private int flag;  
[*]  public FlagStringDataType() {  
[*]  }  
[*]  
[*]  public FlagStringDataType(int flag,String value) {  
[*]    this.value = value;  
[*]    this.flag=flag;  
[*]  }  
[*]  
[*]  public String get() {  
[*]    return value;  
[*]  }  
[*]  
[*]  public void set(String value) {  
[*]    this.value = value;  
[*]  }  
[*]  
[*]  @Override  
[*]  public boolean equals(Object other) {  
[*]    return other != null && getClass().equals(other.getClass())   
[*]            && ((FlagStringDataType) other).get() == value  
[*]            &&((FlagStringDataType) other).getFlag()==flag;  
[*]  }  
[*]  
[*]  @Override  
[*]  public int hashCode() {  
[*]    return Ints.hashCode(flag)+value.hashCode();  
[*]  }  
[*]  
[*]  @Override  
[*]  public int compareTo(FlagStringDataType other) {  
[*]       
[*]    if (flag >= other.flag) {  
[*]      if (flag > other.flag) {  
[*]        return 1;  
[*]      }  
[*]    } else {  
[*]      return -1;  
[*]    }  
[*]    return value.compareTo(other.value);  
[*]  }  
[*]  
[*]  @Override  
[*]  public void write(DataOutput out) throws IOException {  
[*]    log.info("in write()::"+"flag:"+flag+",vlaue:"+value);  
[*]    out.writeInt(flag);  
[*]    out.writeUTF(value);  
[*]  }  
[*]  
[*]  @Override  
[*]  public void readFields(DataInput in) throws IOException {  
[*]      log.info("in read()::"+"flag:"+flag+",vlaue:"+value);  
[*]      flag=in.readInt();  
[*]      value = in.readUTF();  
[*]      log.info("in read()::"+"flag:"+flag+",vlaue:"+value);  
[*]  }  
[*]  
[*]public int getFlag() {  
[*]    return flag;  
[*]}  
[*]  
[*]public void setFlag(int flag) {  
[*]    this.flag = flag;  
[*]}  
[*]  
[*]public String toString(){  
[*]    return flag+":"+value;  
[*]}  
[*]  
[*]}  

  
这个自定义类,使用一个flag来指定是哪个数据,而value则对应是其值。这样做的好处是在reduce端可以根据flag的值来判断其输出位置,这种设计方式可以对多种输入的整合有很大帮助,在mahout中也可以看到这样的设计。
 
reducer(汇总输出数据):
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]package multiple.input;  
[*]  
[*]import java.io.IOException;  
[*]  
[*]import org.apache.hadoop.io.NullWritable;  
[*]import org.apache.hadoop.io.Text;  
[*]import org.apache.hadoop.mapreduce.Reducer;  
[*]import org.slf4j.Logger;  
[*]import org.slf4j.LoggerFactory;  
[*]  
[*]public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{  
[*]    private  Logger log = LoggerFactory.getLogger(MultipleReducer.class);  
[*]    private String delimiter=null; // default is comma  
[*]    @Override  
[*]    public void setup(Context cxt){  
[*]        delimiter= cxt.getConfiguration().get("delimiter", ",");  
[*]    }   
[*]    @Override  
[*]    public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{  
[*]        log.info("================");  
[*]        log.info("         =======");  
[*]        log.info("              ==");  
[*]        String[] value= new String[3];  
[*]        value[2]=key.toString();  
[*]        for(FlagStringDataType v:values){  
[*]            int index= v.getFlag();  
[*]            log.info("index:"+index+"-->value:"+v.get());  
[*]            value= v.get();  
[*]        }  
[*]        log.info("              ==");  
[*]        log.info("         =======");  
[*]        log.info("================");  
[*]        cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());  
[*]    }  
[*]}  

  
这样设计的好处是,可以针对不同的输入数据采取不同的逻辑处理,而且不同的输入数据可以是序列文件的格式。
 
下面介绍一种方式和上面的比,略有不足,但是可以借鉴。
首先是Driver:
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]package multiple.input;  
[*]  
[*]import org.apache.hadoop.conf.Configuration;  
[*]import org.apache.hadoop.conf.Configured;  
[*]import org.apache.hadoop.fs.Path;  
[*]import org.apache.hadoop.io.NullWritable;  
[*]import org.apache.hadoop.io.Text;  
[*]import org.apache.hadoop.mapreduce.Job;  
[*]import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
[*]import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
[*]import org.apache.hadoop.util.Tool;  
[*]import org.apache.hadoop.util.ToolRunner;  
[*]//import org.slf4j.Logger;  
[*]//import org.slf4j.LoggerFactory;  
[*]/** 
[*] * input1(/multiple/user/user): 
[*] * username,user_phone 
[*] *   
[*] * input2(/multiple/phone/phone): 
[*] *  user_phone,description  
[*] *   
[*] * output: username,user_phone,description 
[*] *  
[*] * @author fansy 
[*] * 
[*] */  
[*]public class MultipleDriver2 extends Configured implements Tool{  
[*]//  private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);  
[*]      
[*]    private String input1=null;  
[*]    private String input2=null;  
[*]    private String output=null;  
[*]    private String delimiter=null;  
[*]      
[*]    public static void main(String[] args) throws Exception {  
[*]        Configuration conf=new Configuration();  
[*]//      conf.set("fs.defaultFS", "hdfs://node33:8020");    
[*]//        conf.set("mapreduce.framework.name", "yarn");    
[*]//        conf.set("yarn.resourcemanager.address", "node33:8032");   
[*]          
[*]        ToolRunner.run(conf, new MultipleDriver2(), args);  
[*]    }  
[*]  
[*]    @Override  
[*]    public int run(String[] arg0) throws Exception {  
[*]        configureArgs(arg0);  
[*]        checkArgs();  
[*]          
[*]        Configuration conf= getConf();  
[*]        conf.set("delimiter", delimiter);  
[*]         @SuppressWarnings("deprecation")  
[*]        Job job = new Job(conf, "merge user and phone information ");  
[*]        job.setJarByClass(MultipleDriver2.class);  
[*]        job.setMapperClass(MultipleMapper.class);  
[*]        job.setReducerClass(MultipleReducer.class);  
[*]        job.setMapOutputKeyClass(Text.class);  
[*]        job.setMapOutputValueClass(FlagStringDataType.class);  
[*]        job.setOutputKeyClass(Text.class);  
[*]        job.setOutputValueClass(NullWritable.class);  
[*]          
[*]        job.setNumReduceTasks(1);  
[*]        FileInputFormat.addInputPath(job, new Path(input1));  
[*]        FileInputFormat.addInputPath(job, new Path(input2));  
[*]        FileOutputFormat.setOutputPath(job, new Path(output));  
[*]          
[*]        int res = job.waitForCompletion(true) ? 0 : 1;  
[*]        return res;  
[*]    }  
[*]      
[*]  
[*]    /** 
[*]     * check the args  
[*]     */  
[*]    private void checkArgs() {  
[*]        if(input1==null||"".equals(input1)){  
[*]            System.out.println("no user input...");  
[*]            printUsage();  
[*]            System.exit(-1);  
[*]        }  
[*]        if(input2==null||"".equals(input2)){  
[*]            System.out.println("no phone input...");  
[*]            printUsage();  
[*]            System.exit(-1);  
[*]        }  
[*]        if(output==null||"".equals(output)){  
[*]            System.out.println("no output...");  
[*]            printUsage();  
[*]            System.exit(-1);  
[*]        }  
[*]        if(delimiter==null||"".equals(delimiter)){  
[*]            System.out.println("no delimiter...");  
[*]            printUsage();  
[*]            System.exit(-1);  
[*]        }  
[*]      
[*]    }  
[*]  
[*]    /** 
[*]     * configuration the args 
[*]     * @param args 
[*]     */  
[*]    private void configureArgs(String[] args) {  
[*]        for(int i=0;i<args.length;i++){  
[*]            if("-i1".equals(args)){  
[*]                input1=args[++i];  
[*]            }  
[*]            if("-i2".equals(args)){  
[*]                input2=args[++i];  
[*]            }  
[*]              
[*]            if("-o".equals(args)){  
[*]                output=args[++i];  
[*]            }  
[*]              
[*]            if("-delimiter".equals(args)){  
[*]                delimiter=args[++i];  
[*]            }  
[*]              
[*]        }  
[*]    }  
[*]    public static void printUsage(){  
[*]        System.err.println("Usage:");  
[*]        System.err.println("-i1 input \t user data path.");  
[*]        System.err.println("-i2 input \t phone data path.");  
[*]        System.err.println("-o output \t output data path.");  
[*]        System.err.println("-delimiter  data delimiter , default is comma  .");  
[*]    }  
[*]}  

  
这里添加路径直接使用FileInputFormat添加输入路径,这样的话,针对不同的输入数据的不同业务逻辑可以在mapper中先判断目前正在处理的是那个数据,然后根据其路径来进行相应的业务逻辑处理:
 
 
 view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg 



[*]package multiple.input;  
[*]  
[*]import java.io.IOException;  
[*]  
[*]import org.apache.hadoop.io.LongWritable;  
[*]import org.apache.hadoop.io.Text;  
[*]import org.apache.hadoop.mapreduce.InputSplit;  
[*]import org.apache.hadoop.mapreduce.Mapper;  
[*]import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
[*]/** 
[*] * input1 : 
[*] * username,phone 
[*] *  
[*] * input2 
[*] * phone,description 
[*] *  
[*] * output: 
[*] * <key,value>  --> <,> 
[*] * <key,value>  --> <,> 
[*] * @author fansy 
[*] * 
[*] */  
[*]public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{  
[*]      
[*]    private String delimiter=null; // default is comma  
[*]    private boolean flag=false;  
[*]    @Override  
[*]    public void setup(Context cxt){  
[*]        delimiter= cxt.getConfiguration().get("delimiter", ",");  
[*]        InputSplit input=cxt.getInputSplit();    
[*]        String filename=((FileSplit) input).getPath().getParent().getName();  
[*]        if("user".equals(filename)){  
[*]            flag=true;  
[*]        }  
[*]    }   
[*]      
[*]    @Override  
[*]    public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{  
[*]        String[] values= value.toString().split(delimiter);  
[*]        if(values.length!=2){  
[*]            return;  
[*]        }  
[*]        if(flag){  
[*]            cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));  
[*]        }else{  
[*]            cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));  
[*]        }  
[*]    }  
[*]}  

  
总体来说,这种处理方式其实是不如第一种的,在每个map函数中都需要进行判断,比第一种多了很多操作;同时,针对不同的序列文件,这种方式处理不了(Key、value的类型不一样的情况下)。所以针对多文件格式的输入,最好还是使用第一种方式。
 
 
 
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
页: [1]
查看完整版本: (转)hadoop多文件格式输入