huiselele 发表于 2018-10-31 07:05:06

hadoop 多个maper处理

  package com.smilezl.learn.CalWord;
  import java.io.IOException;
  import java.util.StringTokenizer;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  import org.apache.hadoop.util.GenericOptionsParser;

  public>
  public static>  @Override
  protected void map(Object key, Text value, Context context)
  throws IOException, InterruptedException {
  StringTokenizer tokenizer = new StringTokenizer(value.toString());
  String tmp = "";
  while (tokenizer.hasMoreTokens()) {
  tmp += tokenizer.nextToken().toString();
  }
  context.write(new Text("dept"), new Text(tmp));
  }
  }

  public static>  @Override
  protected void map(Object key, Text value, Context context)
  throws IOException, InterruptedException {
  StringTokenizer tokenizer = new StringTokenizer(value.toString());
  String tmp = "";
  while (tokenizer.hasMoreTokens()) {
  tmp += tokenizer.nextToken().toString();
  }
  context.write(new Text("dept"), new Text(tmp));
  }
  }

  public static>  @Override
  protected void reduce(Object key, Iterable value, Context context)
  throws IOException, InterruptedException {
  StringTokenizer tokenizer = new StringTokenizer(value.toString());
  String tmp = "";
  while (tokenizer.hasMoreTokens()) {
  tmp += tokenizer.nextToken().toString() + "__";
  }
  context.write(new Text("emp"), new Text(tmp));
  }
  }
  public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 3) {
  System.out.println(otherArgs.length);
  System.out.println("Usage: MultipleMapper   ");
  System.exit(2);
  }
  Job job = new Job(conf, "MultipleMapper");
  job.setJarByClass(MultipleMapper.class);
  MultipleInputs.addInputPath(job, new Path(otherArgs), TextInputFormat.class, DeptMapper.class);
  MultipleInputs.addInputPath(job, new Path(otherArgs), TextInputFormat.class, EmpMapper.class);
  FileOutputFormat.setOutputPath(job, new Path(otherArgs));
  job.setCombinerClass(MulReducer.class);
  job.setReducerClass(MulReducer.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
  }

页: [1]
查看完整版本: hadoop 多个maper处理