设为首页 收藏本站
查看: 730|回复: 0

[经验分享] [Hadoop] Hadoop 链式任务 : ChainMapper and ChainReducer的使用

[复制链接]

尚未签到

发表于 2016-12-5 08:07:23 | 显示全部楼层 |阅读模式
  注意:
  1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。 
  2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。
  比如:
  Map1 -> Map2 -> Reducer -> Map3 -> Map4
  (不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)
  任务介绍:
  这个任务需要两步完成:
  1. 对一篇文章进行WordCount
  2. 统计出现次数超过5词的单词
  WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:

package hadoop_in_action_exersice;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class ChainedJobs {
public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public static final int LOW_LIMIT = 5;
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer st = new StringTokenizer(line);
while(st.hasMoreTokens())
output.collect(new Text(st.nextToken()), one);
}
}
public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while(values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws IOException {

JobConf conf = new JobConf(ChainedJobs.class);
conf.setJobName("wordcount");           //设置一个用户定义的job名称
conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类
conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类
// Remove output folder before run job(s)
FileSystem fs=FileSystem.get(conf);
String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
Path op=new Path(outputPath);
if (fs.exists(op)) {
fs.delete(op, true);
System.out.println("存在此输出路径,已删除!!!");
}
FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
JobClient.runJob(conf);         //运行一个job
}
}

  上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。
  为了方便理解,上面的输入的例子如下:

accessed3
accessible4
accomplish1
accounting7
accurately1
acquire1
across1
actual1
actually1
add3
added2
addition1
additional4
  old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X 
  新的API会方便简洁很多
  下面是增加了一个Mapper 来过滤

public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {
@Override
public void map(Text key, IntWritable value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
if(value.get() >= LOW_LIMIT) {
output.collect(key, value);
}
}
}
  这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出
  所以,目前为止,任务链如下:
  TokenizerMapper -> TokenizeReducer -> RangeFilterMapper 
  [size=1em]所以我们的main函数改成下面的样子:

public static void main(String[] args) throws IOException {

JobConf conf = new JobConf(ChainedJobs.class);
conf.setJobName("wordcount");           //设置一个用户定义的job名称
//        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
//        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
//        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
//        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
//        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类
//        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
//        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类
// Step1 : mapper forr word count
JobConf wordCountMapper  = new JobConf(false);
ChainMapper.addMapper(conf,
TokenizeMapper.class,
LongWritable.class,// input key type
Text.class, // input value type
Text.class, // output key type
IntWritable.class,// output value type
false, //byValue or byRefference 传值还是传引用
wordCountMapper);
// Step2: reducer for word count
JobConf wordCountReducer  = new JobConf(false);
ChainReducer.setReducer(conf,
TokenizeReducer.class,
Text.class,
IntWritable.class,
Text.class,
IntWritable.class,
false,
wordCountReducer);
// Step3: mapper used as filter
JobConf rangeFilterMapper  = new JobConf(false);
ChainReducer.addMapper(conf,
RangeFilterMapper.class,
Text.class,
IntWritable.class,
Text.class,
IntWritable.class,
false,
rangeFilterMapper);

// Remove output folder before run job(s)
FileSystem fs=FileSystem.get(conf);
String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
Path op=new Path(outputPath);
if (fs.exists(op)) {
fs.delete(op, true);
System.out.println("存在此输出路径,已删除!!!");
}
FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
JobClient.runJob(conf);         //运行一个job
}
  [size=1em]  下面是运行结果的一部分:

a40
and26
are12
as6
be7
been8
but5
by5
can12
change5
data5
files7
for28
from5
has7
have8
if6
in27
is16
it13
more8
not5
of23
on5
outputs5
see6
so11
that11
the54
  [size=1em] 可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-309690-1-1.html 上篇帖子: hadoop:简单的测试例子 下篇帖子: 安装单机模式hadoop的流程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表