设为首页 收藏本站
查看: 709|回复: 0

[经验分享] 如何使用Hadoop的Reduce Side Join

[复制链接]

尚未签到

发表于 2016-12-8 10:02:33 | 显示全部楼层 |阅读模式
我们都知道在数据库里,多个表之间是可以根据某个链接键进行join的,这也是数据库的范式规范,通过主外键的关联,由此来减少数据冗余,提高性能。当然近几年,随着NOSQL的兴起,出现了基于列的的列式数据库,典型的有Hbase,MongonDB,Cassdran,等等,NOSQL数据库弱化了关联,直接将一整条数据,存入一列,以及去掉了数据库的部分事务特性,从而在海量数据面前显得游刃有余,当然,大部分的NOSQL不支持join操作,也没有绝对的必要支持,因为现在,我们完全是把一整条数据存在了一起,虽然多了许多冗余,但也换来了比较高检索性能,扩展性能,可靠性能。但某些业务场景下,我们仍然需要Join操作,这时候怎么办?
如果数据量比较大的情况下,我们可以使用Hadoop的MapReduce来完成大表join,尤其对Hbase的某些表进行join操作,当然我们也可以使用Hive或Pig来完成,其实质在后台还是运行的一个MR程序。
那么,散仙今天就来看下如何使用MapReduce来完成一个join操作,Hadoop的join分为很多种例如;Reduce链接,Map侧链接,半链接和Reduce侧链接+BloomFilter等等,各个链接都有自己特定的应用场景,没有绝对的谁好谁坏。

今天散仙要说的是,基于Reduce侧的链接,原理如下:
1、在Reudce端进行连接。
   在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。
本次的实现是基于hadoop的旧版API+contribu扩展包里的,DataJoin的工具类辅助来完成的,下篇博客,将会给出,基于新版API,独立来完成Reduce侧的连接示例。

现在看下散仙的两个文件的测试数据,一个是a.txt,另一个是b.txt

a文件的数据
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862



b文件的数据
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07

源码如下:
package com.qin.reducejoin;

import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
import java.util.Iterator;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapred.FileInputFormat;  
import org.apache.hadoop.mapred.FileOutputFormat;  
import org.apache.hadoop.mapred.JobClient;  
import org.apache.hadoop.mapred.JobConf;  
import org.apache.hadoop.mapred.KeyValueTextInputFormat;  
import org.apache.hadoop.mapred.MapReduceBase;  
import org.apache.hadoop.mapred.Mapper;  
import org.apache.hadoop.mapred.OutputCollector;  
import org.apache.hadoop.mapred.Reducer;  
import org.apache.hadoop.mapred.Reporter;  
import org.apache.hadoop.mapred.TextInputFormat;  
import org.apache.hadoop.mapred.TextOutputFormat;  
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;  
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;  
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;  
import com.qin.joinreduceerror.JoinReduce;

/***
*
* Hadoop1.2的版本,旧版本实现的Reduce侧连接
*
* @author qindongliang
*
*    大数据交流群:376932160
*  搜索技术交流群:324714439
*
*
*/
public class DataJoin extends Configured implements Tool {  

/**
*
* Map实现
*
* */
public static class MapClass extends DataJoinMapperBase {  
/**
* 读取输入的文件路径
*
* **/
protected Text generateInputTag(String inputFile) {  
//返回文件路径,做标记
return new Text(inputFile);  
}  

/***
* 分组的Key
*
* **/
protected Text generateGroupKey(TaggedMapOutput aRecord) {  
String line = ((Text) aRecord.getData()).toString();  
String[] tokens = line.split(",");  
String groupKey = tokens[0];  
return new Text(groupKey);  
}  


protected TaggedMapOutput generateTaggedMapOutput(Object value) {  
TaggedWritable retv = new TaggedWritable((Text) value);  
retv.setTag(this.inputTag);  
return retv;  
}  
}  
/**
*
* Reduce进行笛卡尔积
*
* **/
public static class Reduce extends DataJoinReducerBase {  

/***
* 笛卡尔积
*
* */
protected TaggedMapOutput combine(Object[] tags, Object[] values) {  
if (tags.length < 2) return null;   
String joinedStr = "";   
for (int i=0; i<values.length; i++) {  
if (i > 0) {joinedStr += ",";}  
TaggedWritable tw = (TaggedWritable) values;  
String line = ((Text) tw.getData()).toString();  
String[] tokens = line.split(",", 2);  
joinedStr += tokens[1];  
}  
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));  
retv.setTag((Text) tags[0]);   
return retv;  
}  
}  
/**
*
* 自定义的输出类型
*
* ***/
public static class TaggedWritable extends TaggedMapOutput {  
private Writable data;  
/**
* 注意加上构造方法
*
* */
public TaggedWritable() {
// TODO Auto-generated constructor stub
}
public TaggedWritable(Writable data) {  
this.tag = new Text("");  
this.data = data;  
}  
public Writable getData() {  
return data;  
}  
public void write(DataOutput out) throws IOException {  
this.tag.write(out);  
//此行代码很重要
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}  
public void readFields(DataInput in) throws IOException {  
this.tag.readFields(in);  
//加入此部分代码,否则,可能报空指针异常
String temp=in.readUTF();
if (this.data == null|| !this.data.getClass().getName().equals(temp)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(temp), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);  
}  
}  
public int run(String[] args) throws Exception {  
Configuration conf = getConf();  
JobConf job = new JobConf(conf, DataJoin.class);  
job.set("mapred.job.tracker","192.168.75.130:9001");
////读取person中的数据字段
job.setJar("tt.jar");
job.setJarByClass(DataJoin.class);
System.out.println("模式:  "+job.get("mapred.job.tracker"));;


String path="hdfs://192.168.75.130:9000/root/outputjoindb";
FileSystem fs=FileSystem.get(conf);
Path p=new Path(path);
if(fs.exists(p)){
fs.delete(p, true);
System.out.println("输出路径存在,已删除!");
}

Path in = new Path("hdfs://192.168.75.130:9000/root/inputjoindb");  
//  Path out = new Path(args[1]);  
FileInputFormat.setInputPaths(job, in);  
FileOutputFormat.setOutputPath(job, p);  
job.setJobName("cee");  
job.setMapperClass(MapClass.class);  
job.setReducerClass(Reduce.class);  
job.setInputFormat(TextInputFormat.class);  
job.setOutputFormat(TextOutputFormat.class);  
job.setOutputKeyClass(Text.class);  
job.setOutputValueClass(TaggedWritable.class);  
job.set("mapred.textoutputformat.separator", ",");  
JobClient.runJob(job);   
return 0;  
}  
public static void main(String[] args) throws Exception {   

int res = ToolRunner.run(new Configuration(),  
new DataJoin(),  
args);  
System.exit(res);  
}  
}  

运行,日志
模式:  192.168.75.130:9001
输出路径存在,已删除!
INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - FileInputFormat.listStatus(199) | Total input paths to process : 2
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404222310_0025
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 33% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404222310_0025
INFO - Counters.log(585) | Counters: 30
INFO - Counters.log(587) |   Job Counters
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=14335
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=3
INFO - Counters.log(589) |     Data-local map tasks=3
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9868
INFO - Counters.log(587) |   File Input Format Counters
INFO - Counters.log(589) |     Bytes Read=207
INFO - Counters.log(587) |   File Output Format Counters
INFO - Counters.log(589) |     Bytes Written=172
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=837
INFO - Counters.log(589) |     HDFS_BYTES_READ=513
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=221032
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=849
INFO - Counters.log(589) |     Map input records=8
INFO - Counters.log(589) |     Reduce shuffle bytes=849
INFO - Counters.log(589) |     Spilled Records=16
INFO - Counters.log(589) |     Map output bytes=815
INFO - Counters.log(589) |     Total committed heap usage (bytes)=496644096
INFO - Counters.log(589) |     CPU time spent (ms)=2080
INFO - Counters.log(589) |     Map input bytes=187
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=306
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     Reduce input records=8
INFO - Counters.log(589) |     Reduce input groups=4
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=623570944
INFO - Counters.log(589) |     Reduce output records=4
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2908262400
INFO - Counters.log(589) |     Map output records=8

运行结果,如下图所示:
DSC0000.jpg
可以看出,MR正确的完成了join操作,需要注意的是Reduce侧连接的不足之处:
之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
另外一点需要注意的是,散仙在eclipse里进行调试,Local模式下会报异常,建议提交到hadoop的测试集群上进行测试。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311350-1-1.html 上篇帖子: Apache Hadoop 2.0.2-alpha比hadoop-1.x的改进之处 下篇帖子: 【hadoop-1.0】:启动hadoop时,log中出现:java.io.IOException: NameNode is not formatted.
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表