jiang1799 发表于 2017-12-18 12:16:40

Hadoop的ChainMapper和ChainReducer使用案例(链式处理)(四)

  不多说,直接上干货!
  Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项目源自Lucene,自然也借鉴了一些Lucene中的处理方式。
  举个例子,比如处理文本中的一些禁用词,或者敏感词,等等,Hadoop里的链式操作,支持的形式类似正则Map+ Rrduce Map*,代表的意思是全局只能有一个唯一的Reduce,但是在Reduce的前后是可以存在无限多个Mapper来进行一些预处理或者善后工作的。
  注意:
  1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。
  2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。
  比如:
  Map1 -> Map2 -> Reducer -> Map3 -> Map4
  (不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)

  在编程的时候,我们可以借用源码提供给我们的程序!在此基础上进行修改和编写。
  比如我的源码本地目录如下:(找我的本地ChainMapper和ChainReducer案例)
  D:\SoftWare\hadoop-2.2.0-src\hadoop-mapreduce-project\hadoop-mapreduce-client\hadoop-mapreduce-client-core\src\main\java\org\apache\hadoop\mapreduce\lib\chain

  任务介绍:
  这个任务需要两步完成:
  1. 对一篇文章进行WordCount
  2. 统计出现次数超过5词的单词
  WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:
  Java代码

[*]package hadoop_in_action_exersice;
[*]
[*]import java.io.IOException;
[*]import java.util.Iterator;
[*]import java.util.StringTokenizer;
[*]
[*]import org.apache.hadoop.fs.FileSystem;
[*]import org.apache.hadoop.fs.Path;
[*]import org.apache.hadoop.io.IntWritable;
[*]import org.apache.hadoop.io.LongWritable;
[*]import org.apache.hadoop.io.Text;
[*]import org.apache.hadoop.mapred.FileInputFormat;
[*]import org.apache.hadoop.mapred.FileOutputFormat;
[*]import org.apache.hadoop.mapred.JobClient;
[*]import org.apache.hadoop.mapred.JobConf;
[*]import org.apache.hadoop.mapred.MapReduceBase;
[*]import org.apache.hadoop.mapred.Mapper;
[*]import org.apache.hadoop.mapred.OutputCollector;
[*]import org.apache.hadoop.mapred.Reducer;
[*]import org.apache.hadoop.mapred.Reporter;
[*]import org.apache.hadoop.mapred.TextInputFormat;
[*]import org.apache.hadoop.mapred.TextOutputFormat;
[*]
[*]public class ChainedJobs {
[*]
[*]    public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
[*]
[*]      private final static IntWritable one = new IntWritable(1);
[*]      public static final int LOW_LIMIT = 5;
[*]      @Override
[*]      public void map(LongWritable key, Text value,
[*]                OutputCollector<Text, IntWritable> output, Reporter reporter)
[*]                throws IOException {
[*]            String line = value.toString();
[*]            StringTokenizer st = new StringTokenizer(line);
[*]            while(st.hasMoreTokens())
[*]                output.collect(new Text(st.nextToken()), one);
[*]
[*]      }
[*]
[*]    }
[*]
[*]    public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
[*]
[*]      @Override
[*]      public void reduce(Text key, Iterator<IntWritable> values,
[*]                OutputCollector<Text, IntWritable> output, Reporter reporter)
[*]                throws IOException {
[*]            int sum = 0;
[*]            while(values.hasNext()) {
[*]                sum += values.next().get();
[*]            }
[*]            output.collect(key, new IntWritable(sum));
[*]      }
[*]
[*]    }
[*]
[*]
[*]    public static void main(String[] args) throws IOException {
[*]
[*]
[*]      JobConf conf = new JobConf(ChainedJobs.class);
[*]      conf.setJobName("wordcount");         //设置一个用户定义的job名称
[*]      conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
[*]      conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
[*]      conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
[*]      conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
[*]      conf.setReducerClass(TokenizeReducer.class);      //为job设置Reduce类
[*]      conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
[*]      conf.setOutputFormat(TextOutputFormat.class);//为map-reduce任务设置OutputFormat实现类
[*]
[*]      // Remove output folder before run job(s)
[*]      FileSystem fs=FileSystem.get(conf);
[*]      String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
[*]      Path op=new Path(outputPath);
[*]      if (fs.exists(op)) {
[*]            fs.delete(op, true);
[*]            System.out.println("存在此输出路径,已删除!!!");
[*]      }
[*]
[*]      FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
[*]      FileOutputFormat.setOutputPath(conf, new Path(outputPath));
[*]      JobClient.runJob(conf);         //运行一个job
[*]    }
[*]
[*]}
  上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。
  为了方便理解,上面的输入的例子如下:
  Java代码

[*]accessed    3
[*]accessible4
[*]accomplish1
[*]accounting7
[*]accurately1
[*]acquire 1
[*]across1
[*]actual1
[*]actually    1
[*]add 3
[*]added   2
[*]addition    1
[*]additional4
  old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X
  新的API会方便简洁很多
  下面是增加了一个Mapper 来过滤
  Java代码

[*]public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {
[*]
[*]    @Override
[*]    public void map(Text key, IntWritable value,
[*]            OutputCollector<Text, IntWritable> output, Reporter reporter)
[*]            throws IOException {
[*]
[*]      if(value.get() >= LOW_LIMIT) {
[*]            output.collect(key, value);
[*]      }
[*]
[*]    }
[*]}
  这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出
  所以,目前为止,任务链如下:
  TokenizerMapper -> TokenizeReducer -> RangeFilterMapper
  所以我们的main函数改成下面的样子:
  Java代码

[*]public static void main(String[] args) throws IOException {
[*]
[*]
[*]    JobConf conf = new JobConf(ChainedJobs.class);
[*]    conf.setJobName("wordcount");         //设置一个用户定义的job名称
[*]//      conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
[*]//      conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
[*]//      conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
[*]//      conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
[*]//      conf.setReducerClass(TokenizeReducer.class);      //为job设置Reduce类
[*]//      conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
[*]//      conf.setOutputFormat(TextOutputFormat.class);//为map-reduce任务设置OutputFormat实现类
[*]
[*]    // Step1 : mapper forr word count
[*]    JobConf wordCountMapper= new JobConf(false);
[*]    ChainMapper.addMapper(conf,
[*]            TokenizeMapper.class,
[*]            LongWritable.class,   // input key type
[*]            Text.class,             // input value type
[*]            Text.class,             // output key type
[*]            IntWritable.class,      // output value type
[*]            false,                  //byValue or byRefference 传值还是传引用
[*]            wordCountMapper);
[*]
[*]    // Step2: reducer for word count
[*]    JobConf wordCountReducer= new JobConf(false);
[*]    ChainReducer.setReducer(conf,
[*]            TokenizeReducer.class,
[*]            Text.class,
[*]            IntWritable.class,
[*]            Text.class,
[*]            IntWritable.class,
[*]            false,
[*]            wordCountReducer);
[*]
[*]      // Step3: mapper used as filter
[*]    JobConf rangeFilterMapper= new JobConf(false);
[*]    ChainReducer.addMapper(conf,
[*]            RangeFilterMapper.class,
[*]            Text.class,
[*]            IntWritable.class,
[*]            Text.class,
[*]            IntWritable.class,
[*]            false,
[*]            rangeFilterMapper);
[*]
[*]
[*]    // Remove output folder before run job(s)
[*]    FileSystem fs=FileSystem.get(conf);
[*]    String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
[*]    Path op=new Path(outputPath);
[*]    if (fs.exists(op)) {
[*]      fs.delete(op, true);
[*]      System.out.println("存在此输出路径,已删除!!!");
[*]    }
[*]
[*]    FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
[*]    FileOutputFormat.setOutputPath(conf, new Path(outputPath));
[*]    JobClient.runJob(conf);         //运行一个job
[*]}
  下面是运行结果的一部分:
  Java代码

[*]a   40
[*]and 26
[*]are 12
[*]as6
[*]be7
[*]been    8
[*]but 5
[*]by5
[*]can 12
[*]change5
[*]data    5
[*]files   7
[*]for 28
[*]from    5
[*]has 7
[*]have    8
[*]if6
[*]in27
[*]is16
[*]it13
[*]more    8
[*]not 5
[*]of23
[*]on5
[*]outputs 5
[*]see 6
[*]so11
[*]that    11
[*]the 54
  可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。
页: [1]
查看完整版本: Hadoop的ChainMapper和ChainReducer使用案例(链式处理)(四)