设为首页 收藏本站
查看: 646|回复: 0

[经验分享] hadoop多表关联

[复制链接]

尚未签到

发表于 2016-12-4 11:25:27 | 显示全部楼层 |阅读模式

2.多表关联

多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。下面进入这个实例。
实例描述
输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名地址名对应关系,输出"工厂名——地址名"表。
样例输入如下所示。
1)factory:

factoryname     addressed
Beijing Red Star     1
Shenzhen Thunder     3
Guangzhou Honda     2
Beijing Rising     1
Guangzhou Development Bank2
Tencent         3
Back of Beijing      1

2)address:

addressID addressname
1     Beijing
2     Guangzhou
3     Shenzhen
4     Xian

样例输出如下所示。

factoryname     addressname
Back of Beijing      Beijing
Beijing Red Star     Beijing
Beijing Rising       Beijing
Guangzhou Development BankGuangzhou
Guangzhou Honda     Guangzhou
Shenzhen Thunder     Shenzhen
Tencent         Shenzhen

设计思路
多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。
这个实例的具体分析参考单表关联实例。下面给出代码。
程序代码
程序代码如下所示:

packagecom.hebut.mr;

importjava.io.IOException;
importjava.util.*;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.util.GenericOptionsParser;

publicclassMTjoin{

publicstaticinttime=0;

/*
*map中先区分输入行属于左表还是右表,然后对两列值进行分割,
*保存连接列在key值,剩余列和左右表标志在value中,最后输出
*/
publicstaticclassMapextendsMapper<Object,Text, Text, Text> {

//实现map函数
publicvoidmap(Objectkey, Text value, Context context)
throwsIOException,InterruptedException {
String line = value.toString();//每行文件
String relationtype =newString();//左右表标识

//输入文件首行,不处理
if(line.contains("factoryname")==true
|| line.contains("addressed")==true) {
return;
}

//输入的一行预处理文本
StringTokenizer itr =newStringTokenizer(line);
String mapkey =newString();
String mapvalue =newString();
inti= 0;
while(itr.hasMoreTokens()){
//先读取一个单词
String token = itr.nextToken();
//判断该地址ID就把存到"values[0]"
if(token.charAt(0)>='0'&& token.charAt(0) <='9') {
mapkey = token;
if(i> 0) {
relationtype ="1";
}else{
relationtype ="2";
}
continue;
}

//存工厂名
mapvalue += token +" ";
i++;
}

//输出左右表
context.write(newText(mapkey),newText(relationtype+"+"+ mapvalue));
}
}

/*
* reduce解析map输出,将value中数据按照左右表分别保存,
  *然后求出笛卡尔积,并输出。
*/
publicstaticclassReduceextendsReducer<Text,Text, Text, Text> {

//实现reduce函数
publicvoidreduce(Textkey, Iterable<Text> values, Context context)
throwsIOException,InterruptedException {

//输出表头
if(0==time) {
context.write(newText("factoryname"),newText("addressname"));
time++;
}

intfactorynum= 0;
String[] factory =newString[10];
intaddressnum= 0;
String[]address=newString[10];

Iteratorite= values.iterator();
while(ite.hasNext()){
String record = ite.next().toString();
intlen= record.length();
inti= 2;
if(0== len) {
continue;
}

//取得左右表标识
charrelationtype= record.charAt(0);

//左表
if('1'==relationtype) {
factory[factorynum] = record.substring(i);
factorynum++;
}

//右表
if('2'==relationtype) {
address[addressnum]= record.substring(i);
addressnum++;
}
}

//求笛卡尔积
if(0!= factorynum && 0 != addressnum) {
for(intm= 0; m < factorynum; m++) {
for(intn= 0; n < addressnum; n++) {
//输出结果
context.write(newText(factory[m]),
newText(address[n]));
}
}
}

}
}

publicstaticvoidmain(String[]args)throwsException {
Configuration conf =newConfiguration();
//这句话很关键
conf.set("mapred.job.tracker","192.168.1.2:9001");

String[] ioArgs =newString[]{"MTjoin_in","MTjoin_out"};
String[] otherArgs =newGenericOptionsParser(conf,ioArgs).getRemainingArgs();
if(otherArgs.length!=2) {
System.err.println("Usage:Multiple Table Join <in> <out>");
System.exit(2);
}

Job job =newJob(conf,"MultipleTable Join");
job.setJarByClass(MTjoin.class);

//设置MapReduce处理类
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

//设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

//设置输入和输出目录
FileInputFormat.addInputPath(job,newPath(otherArgs[0]));
FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-309520-1-1.html 上篇帖子: Hadoop 权限管理 下篇帖子: hadoop调度器
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表