设为首页 收藏本站
查看: 608|回复: 0

[经验分享] Hadoop实现Secondary Sort (转)

[复制链接]

尚未签到

发表于 2016-12-6 08:15:58 | 显示全部楼层 |阅读模式
在hadoop中每个reduce的输入的key都是有序的,而value则是无序的。而且同一个job运行多次,由于map完成顺序不同,reduce收到的value顺序是不固定的。那如何才能实现reduce收到有序的value呢?这就需要Secondary Sort。
Secondary Sort要解决的问题:reduce收到的value有序。
这里举一个场景,来说明Secondary Sort是如何实现的。假设我们有若干公司若干部门的人数,数据样例如下:
 
 
  公司名   部门的人数

Taobao 52
Taobao 31
Taobao 67
Alipay 10
Alipay 36
Alipay 29
B2B 120
B2B 72
Aliyun 13
Aliyun 32
Aliyun 3

我们想知道每个公司的最大部门(人数最多)的人数。即希望先按公司名group,然后对group内的人数降序排列,最后取每个group的第一个即可。
由于reduce收到的value是无序的,所以要对value进行排序,首先需要将value封装到key里面。即需要自定义key的类型,代码如下:
 
[java] view plaincopy 



  • <span style="font-family:Microsoft YaHei;font-size:18px;">import java.io.DataInput;  
  • import java.io.DataOutput;  
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.io.IntWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.io.WritableComparable;  
  •   
  • public class MyKey implements WritableComparable<MyKey> {  
  •   public final Text first;  
  •   public final IntWritable second;  
  •   
  •   public MyKey() {  
  •     first = new Text();  
  •     second = new IntWritable();  
  •   }  
  •   
  •   public MyKey(Text first, IntWritable second) {  
  •     this.first = first;  
  •     this.second = second;  
  •   }  
  •   
  •   @Override  
  •   public void write(DataOutput out) throws IOException {  
  •     first.write(out);  
  •     second.write(out);  
  •   }  
  •   
  •   @Override  
  •   public void readFields(DataInput in) throws IOException {  
  •     first.readFields(in);  
  •     second.readFields(in);  
  •   }  
  •   
  •   @Override  
  •   public String toString() {  
  •     return first + "\t" + second;  
  •   }  
  •   
  •   @Override  
  •   public int compareTo(MyKey tp) {  
  •     int cmp = first.compareTo(tp.first);  
  •     if (cmp != 0) {  
  •       return cmp;  
  •     }  
  •     return -second.compareTo(tp.second);  
  •   }  
  • }</span>  

  
这里新定义的类型为MyKey封装了一个Text和一个IntWritable,依次存放公司名和部门人数。Hadoop要求key的类型必须实现Writable和Comparable,前者为了支持序列化和反序列化,后者为了实现基于比较的排序。需要注意的是compareTo()方法中先按first即公司名升序排列,后按second即部门人数降序排列。另外toString()方法的实现是为了定义输出格式,即公司名+tab+最大部门人数。

 
定义key后还不能满足需求。因为默认的HashPartitioner会将相同的key分配给同一个reduce,而我们希望的是first相同的key分给同一个reduce处理,默认的Partitioner显然保证不了这一点。这就需要我们自定义Partitioner,实现first相同的key分配给同一个reduce。实现代码如下:
 
[java] view plaincopy 



  • <span style="font-family:Microsoft YaHei;font-size:18px;">import org.apache.hadoop.io.NullWritable;  
  • import org.apache.hadoop.mapred.JobConf;  
  • import org.apache.hadoop.mapred.Partitioner;  
  •   
  •   
  • public class MyPartitioner   
  •   implements Partitioner<MyKey, NullWritable> {  
  •   
  •   @Override  
  •   public void configure(JobConf job) {}  
  •   
  •   @Override  
  •   public int getPartition(MyKey key, NullWritable value, int numPartitions) {  
  •     return (key.first.hashCode() & Integer.MAX_VALUE) % numPartitions;  
  •   }  
  • }  
  • </span>  

  
MyPartitioner的getPartition()方法中,只考虑first,不考虑second,这样就满足了我们的需求。

 
实现到这一步后,reduce会获取到按first升序且按second降序排列的key序列。而我们希望first相同的key中,只获取第一个的second即可,其他数据可以忽略。这就需要数据执行reduce前按照key的first字段进行归并,即grouping。first相同的key归为一个group,将第一个key和所有的value(value为NullWritable类型,无需处理)传给reduce()方法。然后reduce将key输出即可实现目的。为了实现这样的grouping操纵,需要自定义归并比较器(ValueGroupingComparator),代码如下:
 
[java] view plaincopy 



  • <span style="font-family:Microsoft YaHei;font-size:18px;">import org.apache.hadoop.io.WritableComparable;  
  • import org.apache.hadoop.io.WritableComparator;  
  •   
  • public class MyGroupComparator extends WritableComparator {  
  •   protected MyGroupComparator() {  
  •     super(MyKey.classtrue);  
  •   }  
  •     
  •   @Override  
  •   public int compare(WritableComparable w1, WritableComparable w2) {  
  •     MyKey m1 = (MyKey) w1;  
  •     MyKey m2 = (MyKey) w2;  
  •     return m1.first.compareTo(m2.first);  
  •   }  
  •   
  • }  
  • </span>  

  
从MyGroupComparator代码中可以看出,compare中只比较firest而忽略second。

 
以上模块自定义好后,map和reduce实现会相当容易。map只需要将公司名和部门人数构造成一个MyKey对象即可。而reduce中将收到的key输出就ok了。实现SecondarySort的作业代码如下:
 
[java] view plaincopy 



  • <span style="font-family:Microsoft YaHei;font-size:18px;">import java.io.IOException;  
  • import java.util.Iterator;  
  •   
  • import org.apache.hadoop.conf.Configuration;  
  • import org.apache.hadoop.conf.Configured;  
  • import org.apache.hadoop.fs.Path;  
  • import org.apache.hadoop.io.*;  
  • import org.apache.hadoop.mapred.*;  
  • import org.apache.hadoop.util.Tool;  
  • import org.apache.hadoop.util.ToolRunner;  
  •   
  • public class MySecondarySort extends Configured implements Tool{  
  •     
  •   public static class MyMap extends MapReduceBase  
  •     implements Mapper<Text, Text, MyKey, NullWritable> {  
  •       
  •     private IntWritable num = new IntWritable();  
  •       
  •     @Override  
  •     public void map(Text key, Text value,   
  •                     OutputCollector<MyKey, NullWritable> output,   
  •                     Reporter reporter) throws IOException {  
  •   
  •       num.set(Integer.parseInt(value.toString()));  
  •       MyKey myKey = new MyKey(key, num);  
  •       output.collect(myKey, NullWritable.get());  
  •     }  
  •   }  
  •     
  •   public static class MyReduce extends MapReduceBase  
  •     implements Reducer<MyKey, NullWritable, MyKey, NullWritable> {  
  •   
  •     @Override    
  •     public void reduce(MyKey key, Iterator<NullWritable> values,  
  •                        OutputCollector<MyKey, NullWritable> output,   
  •                        Reporter reporter) throws IOException {  
  •       output.collect(key, NullWritable.get());  
  •     }  
  •   }  
  •     
  •     
  •   @Override  
  •   public int run(String[] args) throws Exception {  
  •     JobConf conf = new JobConf(getConf(), MySecondarySort.class);   
  •     conf.setJobName("wordcount");  
  •    
  •     conf.setOutputKeyClass(MyKey.class);  
  •     conf.setOutputValueClass(NullWritable.class);  
  •       
  •     conf.setMapperClass(MyMap.class);          
  •     conf.setReducerClass(MyReduce.class);  
  •      
  •     conf.setPartitionerClass(MyPartitioner.class);  
  •     conf.setOutputValueGroupingComparator(MyGroupComparator.class);  
  •       
  •     conf.setInputFormat(KeyValueTextInputFormat.class);  
  •     conf.set("key.value.separator.in.input.line"" ");  
  •   
  •     FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  •     FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  •     JobClient.runJob(conf);  
  •     return 0;  
  •   }  
  •     
  •   public static void main(String[] args) throws Exception {   
  •     int res = ToolRunner.run(new Configuration(), new MySecondarySort(), args);  
  •     System.exit(res);  
  •   }  
  • }  
  • </span>  

  
注意由于输入格式是key+空格+value,这里采用KeyValueTextInputFormat,避免了map中做分割字符串操作。

 
对于输入如下内容的文件:
 
$ bin/hadoopfs -cat /liangly/list
Taobao 52
Taobao 31
Taobao 67
Alipay 10
Alipay 36
Alipay 29
B2B 120
B2B 72
Aliyun 13
Aliyun 32
Aliyun 3
  执行上面实现的Job:
 
$ bin/hadoopjar job.jar MySecondarySort \
> -Dmapred.map.tasks=3 \
> -Dmapred.reduce.tasks=2 \
> /liangly/list \
> /liangly/out
  作业结束后输出如下:
 
$ bin/hadoopfs -cat /liangly/out/*
Alipay  36
Aliyun 32
B2B    120
  Taobao  67

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-310204-1-1.html 上篇帖子: Hadoop学习笔记之WordCount 下篇帖子: hadoop常见配置含义
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表