设为首页 收藏本站
查看: 682|回复: 0

[经验分享] Hadoop Combiner 操作

[复制链接]

尚未签到

发表于 2016-12-4 09:59:47 | 显示全部楼层 |阅读模式
  近期看了一本书:Data-intensive Text Processing with MapReduce,是讲如何设计MR程序的,看到一个例子是Combiner的设计模式,然后就动手实现了下。具体问题如下:
  现有输入数据如下:

one3.9
one4.0
one3.8
two44
two44
two44
three9898
four2323
four2323
five2323
six23
six2323
four232
five2323


第一列代表用户,第二列代表用户在一个网站上所停留的时间,现在想求每个用户在这个网站的平均停留时间。如果不用combine操作的话,那么其MR伪代码如下(复制书上的内容):
class Mapper
method Map(string t, integer r)
Emit(string t, integer r)
class Reducer
method Reduce(string t, integers [r1 , r2 , . . .])
sum ← 0
cnt ← 0
for all integer r ∈ integers [r1 , r2 , . . .] do
sum ← sum + r
cnt ← cnt + 1
ravg ← sum/cnt
Emit(string t, integer ravg )
如果要加combine怎么操作呢?Combiner和Reducer一样么(求最大气温的例子或许是一样的,但这里却不是,而且现实中的很多例子都不是一样的),如果一样的话那么就会变成下面的错误操作了:
Mean(1, 2, 3, 4, 5) = Mean(Mean(1, 2), Mean(3, 4, 5))
正确的伪代码如下(书上摘录):
class Mapper
method Map(string t, integer r)
Emit(string t, pair (r, 1))
class Combiner
method Combine(string t, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
sum ← 0
cnt ← 0
for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
sum ← sum + s
cnt ← cnt + c
Emit(string t, pair (sum, cnt))
class Reducer
method Reduce(string t, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
sum ← 0
cnt ← 0
for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
sum ← sum + s
cnt ← cnt + c
ravg ← sum/cnt
Emit(string t, integer ravg )
由于Combiner的输入和输出格式要一样,即Combiner的输入要和Mapper的输出格式一样,Combiner的输出要和Reducer的输入格式一样。所以上面有pairs。参考上面的伪代码编写的代码如下:  Driver:

package org.fansy.date922;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class AverageDriver3 {
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: AverageDriver<in> <out>");
System.exit(2);
}
Job job1 = new Job(conf1, "AverageDriver  job ");
job1.setInputFormatClass(KeyValueTextInputFormat.class);   
job1.setNumReduceTasks(1);
job1.setJarByClass(AverageDriver3.class);
job1.setMapperClass(AverageM2.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(TextPair.class);
job1.setCombinerClass(AverageC3.class);
job1.setReducerClass(AverageR2.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(DoubleWritable.class);
KeyValueTextInputFormat.addInputPath(job1, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job1, new Path(otherArgs[1]));   
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}  
System.out.println("************************");
}
}

Mapper:
package org.fansy.date922;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AverageM3 extends Mapper<Text,Text,Text,TextPair>{
//private Text newkey=new Text();
private TextPair newvalue=new TextPair();
private DoubleWritable r=new DoubleWritable();
private IntWritable number=new IntWritable(1);
public  void map(Text key,Text value,Context context)throws IOException,InterruptedException {
// TODO Auto-generated method stub
System.out.println(key.toString());
double shuzhi=Double.parseDouble(value.toString());
r.set(shuzhi);
newvalue.set(r, number);
context.write(key, newvalue);
}
}

Combiner:
package org.fansy.date922;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AverageC3 extends Reducer<Text,TextPair,Text,TextPair>{
private DoubleWritable newvalued=new DoubleWritable();
private IntWritable newvaluei=new IntWritable();
private TextPair newvalue=new TextPair();
public  void reduce(Text key,Iterable<TextPair> values,Context context) throws IOException,InterruptedException{
// TODO Auto-generated method stub
double sum= 0.0;
int num=0;
for(TextPair val:values){
sum+=val.getFirst().get();
num+=val.getSecond().get();
}
newvalued.set(sum);
newvaluei.set(num);
newvalue.set(newvalued,newvaluei);
context.write(key, newvalue);
}
}


Reducer:
package org.fansy.date922;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AverageR3 extends Reducer<Text,TextPair,Text,DoubleWritable>{
private DoubleWritable newvalue=new DoubleWritable();
public  void reduce(Text key,Iterable<TextPair> values,Context context) throws IOException,InterruptedException{
// TODO Auto-generated method stub
double sum= 0.0;
int num=0;
for(TextPair val:values){
sum+=val.getFirst().get();
num+=val.getSecond().get();
}
double aver=sum/num;
newvalue.set(aver);
context.write(key, newvalue);
}
}


TextPair:
package org.fansy.date922;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.WritableComparable;
public class TextPair implements WritableComparable<TextPair> {
private DoubleWritable first;
private IntWritable second;
public TextPair(){
set(new DoubleWritable(),new IntWritable());
}
public  void set(DoubleWritable longWritable, IntWritable intWritable) {
// TODO Auto-generated method stub
this.first=longWritable;
this.second=intWritable;
}
public DoubleWritable getFirst(){
return first;
}
public IntWritable getSecond(){
return second;
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
first.readFields(arg0);
second.readFields(arg0);
}
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
first.write(arg0);
second.write(arg0);
}
@Override
public int compareTo(TextPair o) {
// TODO Auto-generated method stub
int cmp=first.compareTo(o.first);
if(cmp!=0){
return cmp;
}
return second.compareTo(o.second);
}
}



查看终端中的显示也可以看出的确是有combine操作的:
12/09/22 15:55:45 INFO mapred.JobClient: Job complete: job_local_0001
12/09/22 15:55:45 INFO mapred.JobClient: Counters: 22
12/09/22 15:55:45 INFO mapred.JobClient:   File Output Format Counters
12/09/22 15:55:45 INFO mapred.JobClient:     Bytes Written=65
12/09/22 15:55:45 INFO mapred.JobClient:   FileSystemCounters
12/09/22 15:55:45 INFO mapred.JobClient:     FILE_BYTES_READ=466
12/09/22 15:55:45 INFO mapred.JobClient:     HDFS_BYTES_READ=244
12/09/22 15:55:45 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=82758
12/09/22 15:55:45 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=65
12/09/22 15:55:45 INFO mapred.JobClient:   File Input Format Counters
12/09/22 15:55:45 INFO mapred.JobClient:     Bytes Read=122
12/09/22 15:55:45 INFO mapred.JobClient:   Map-Reduce Framework
12/09/22 15:55:45 INFO mapred.JobClient:     Map output materialized bytes=118
12/09/22 15:55:45 INFO mapred.JobClient:     Map input records=14
12/09/22 15:55:45 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/09/22 15:55:45 INFO mapred.JobClient:     Spilled Records=12
12/09/22 15:55:45 INFO mapred.JobClient:     Map output bytes=231
12/09/22 15:55:45 INFO mapred.JobClient:     Total committed heap usage (bytes)=301727744
12/09/22 15:55:45 INFO mapred.JobClient:     CPU time spent (ms)=0
12/09/22 15:55:45 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/09/22 15:55:45 INFO mapred.JobClient:     Combine input records=14
12/09/22 15:55:45 INFO mapred.JobClient:     Reduce input records=6
12/09/22 15:55:45 INFO mapred.JobClient:     Reduce input groups=6
12/09/22 15:55:45 INFO mapred.JobClient:    Combine output records=6
12/09/22 15:55:45 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
12/09/22 15:55:45 INFO mapred.JobClient:     Reduce output records=6
12/09/22 15:55:45 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
12/09/22 15:55:45 INFO mapred.JobClient:     Map output records=14
************************那本书上面其实最后还有提到一个 in-Mapper Combining的一个编程,但是看的不是很明白,伪代码如下:
class Mapper
method Initialize
S ← new AssociativeArray
C ← new AssociativeArray
method Map(string t, integer r)
S{t} ← S{t} + r
C{t} ← C{t} + 1
method Close
for all term t ∈ S do
Emit(term t, pair (S{t}, C{t}))



继续学习 MR编程中。。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-309414-1-1.html 上篇帖子: Hadoop权威指南摘抄(初识Hadoop) 下篇帖子: Hadoop 学习之三
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表