设为首页 收藏本站
查看: 828|回复: 0

[经验分享] [Hadoop] 分布式Join : Replicated Join

[复制链接]

尚未签到

发表于 2016-12-8 09:44:39 | 显示全部楼层 |阅读模式
  上一篇文章说的ReduceSide Join的一个缺点就是,在map方法之中,只对数据加了tag、提取了groupkey,没有做任何的数据过滤,这样在map-reduce之中的shuffle过程会造成大量的 磁盘IO使得效率降低。
  这次使用的是Replicated Join,完成的任务跟上次一样.
  它有一个前提:需要关联在一起的两个文件,其中一个文件比较小,至少能放到内存之中。
  其中一个关键的地方就是,在运行job之前,先将本地的小文件(简称为smallIn)上传到Hadoop集群的每一个服务器之中。在每个集群之中,大文件的split跟smallIn进行jion的操作。这样效率会比较高。
  上传的代码:

Path smallIn = new Path("...");
DistributedCache.addCacheFile(smallIn.toUri(), conf);
  除了这种方式,可以在命令行调用的时候自动上传:

bin/hadoop jar -files smallIn DataJoinDC.jar big_in.txt output
  我们对比两个文件的原始大小:
  u.user: 23KB
  u.data: 1933KB
  所以,显然我们应该选择u.user作为DistributedCache文件。
  在Map类之中,如果使用新的API,则在setup之中进行,如果是old api 则在configure()方法之中进行分解动作。
  获取Cache之中的文件的代码如下:

URI[] cachesFiles = DistributedCache.getCacheFiles(conf);
  这跟<Hadoop in Action> 上的代码不太一样,书上是调用 

Path[] cachesFiles = DistributedCache.getLocalCacheFiles(conf);
  一般这是在Local模式下进行的。
  Note:
  以上一部分是猜测!因为这个程序在我的环境Eclipse + Ubunut (Pseodu-Distributed) 模式没有成功,无法正确的获取到Cache文件!
  最后,贴上我的代码做个纪念,暂时保留这个问题:

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;
import java.util.Iterator;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Main {
private static final String U_DATA_SEPARATOR = "\t";
private static final String U_USER_SEPARATOR = "[|]";
public static class Map extends Mapper<Text, Text, Text, Text> {
private Hashtable<String, String> table = new Hashtable<String, String>();
@Override
protected void map(Text key, Text value,
Mapper<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String dataInfo = value.toString();
if(dataInfo.trim().length() == 0) return;
if(table.containsKey(key.toString())) {
String userInfo = table.get(key.toString());
String[] userTokens = userInfo.split(U_USER_SEPARATOR);
String userData = "age=" + userTokens[0];
String[] dataTokens = dataInfo.split(U_DATA_SEPARATOR);
String ratingData = "ratings=" + dataTokens[1];
context.write(key, new Text(userData + "|" + ratingData));
}
}
@Override
protected void setup(Mapper<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
try {
Configuration conf = context.getConfiguration();
//Path[] cachesFiles = DistributedCache.getLocalCacheFiles(conf);
URI[] cachesFiles = DistributedCache.getCacheFiles(conf);
if(cachesFiles != null && cachesFiles.length > 0) {
Iterator<String> it = FileUtils.lineIterator(new File(cachesFiles[0].toString()));
while(it.hasNext()) {
String line = it.next();
System.out.println("~~~~~~~ line=" + line);
if(line.trim().length() == 0) continue;
String[] tokens = line.split(U_USER_SEPARATOR, 2);
table.put(tokens[0], tokens[1]);
}
} else {
System.out.println("!!!!!!!!!!!!!!!! empty cache files");
}
} catch (IOException e) {
e.printStackTrace();
}
}

}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJobName("Replicated Join");
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//job.setOutputKeyClass(Text.class);
//job.setOutputValueClass(TextOutputFormat.class);
FileSystem fs = FileSystem.get(conf);
//Path localSmallIn = new Path("/home/hadoop/DataSet/movielens/u.user");
Path hdfsSmallIn = new Path("/data/u.user");
//fs.copyFromLocalFile(true, localSmallIn , hdfsSmallIn);
//DistributedCache.addCacheFile(
//hdfsSmallIn.toUri(), conf);
Path bigIn = new Path("/home/hadoop/DataSet/movielens/u.data");
Path out = new Path("/home/hadoop/DataSet/movielens-Replicated-output");
//Path bigIn = new Path("/data/u.data");
//Path out = new Path("/data/movielens-Replicated-output");
if(fs.exists(out)) {
System.out.println("输出目录已经存在,将其删除~");
fs.delete(out, true);
}
FileInputFormat.setInputPaths(job, bigIn);
FileOutputFormat.setOutputPath(job, out);
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", U_DATA_SEPARATOR);
//job.set("key.value.separator.in.input.line", U_DATA_SEPARATOR);// for u.data
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311325-1-1.html 上篇帖子: 深入浅出Hadoop实战开发 下篇帖子: hadoop 2.0.1-alpha上部署 oozie-3.2.0
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表