设为首页 收藏本站
查看: 1399|回复: 0

[经验分享] 解析hadoop框架下的Map-Reduce job的输出格式的实现

[复制链接]

尚未签到

发表于 2016-12-13 07:16:20 | 显示全部楼层 |阅读模式
  
Hadoop 其实并非一个单纯用于存储的分布式文件系统,而是一个被设计用来在由普通硬件设备组成的大型集群上执行分布式应用的框架。 Hadoop 包含两个部分:一个分布式文件系统 HDFS (Hadoop Distributed File System),和一个Map-Reduce实现。
  研究hadoop,从nutch入手是比较好的选择,分布式文件系统就不说了,下面说说MapReduce产生Job中设置的输入输出,一般new一个Job会这样设置 输入输出路径:

FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
  从方法名称上,你可能会发现add、set的前缀,没错,输入可以添加多个路径,输出只能设置一个路径。
  设置输入、输出格式:

job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(MapFileOutputFormat.class);
  输出格式

  看过nutch的同志,会发现nutch的一个精彩实现,就是实现OutputFormat接口的FetcherOutputFormat类,我们来看看怎么个回事。
  接口
:org.apache.hadoop.mapred.OutputFormat<K
,
V
>

public interface OutputFormat<K, V> {
RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress)
throws IOException;
void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
}

  checkOutputSpecs
:检查job的输出路径是否存在,如果存在则抛出异常(IOException)。我这里的版本是0.19.2,还没有override的功能,可能后面会支持。
   getRecordWriter
    :把输出键值对 output <key, value> 写入到输出路径中。
  mapred下面的实现有三个,如下图:
DSC0000.png
  基类FileOutputFormat
:org.apache.hadoop.mapred.FileOutputFormat

public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job, String name,Progressable progress)
throws IOException;
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException,
InvalidJobConfException, IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null && job.getNumReduceTasks() != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
if (outDir != null) {
FileSystem fs = outDir.getFileSystem(job);
// normalize the output directory
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
// check its existence
if (fs.exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +  " already exists");
}
}
}
  这是个抽象类,实现了检查输入路径是否存在的方法,具体输出方式写成抽象方法预留给了子类。
  子类见下图:
DSC0001.png
  

  子类MapFileOutputFormat
:org.apache.hadoop.mapred.MapFileOutputFormat

public class MapFileOutputFormat
extends FileOutputFormat<WritableComparable, Writable> {
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass().asSubclass(WritableComparable.class),
job.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec,
progress);
return new RecordWriter<WritableComparable, Writable>() {
public void write(WritableComparable key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
}

  关键点在于获取分布式文件输出句柄MapFile.Writer,完成输出任务后会关闭输出。每个实现都有特定用途,都需要弄清楚,在这里就不再一一介绍了。
  上面是hadoop自己的实现,在具体的编程过程中,我们肯定会有自己的实现去定义输出格式。上面也讲到了job只能设置输出路径,不能添加多个输出路径,那么有什么解决措施呢?来看看nutch中的精彩实现,会给我们启示:
  自己的实现:
org.apache.nutch.parse.ParseOutputFormat

public class ParseOutputFormat implements OutputFormat<Text, Parse> {
//这里不是检查输出路径,是检查数据路径下的子路径,改变了接口中的定义
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME)))
throw new IOException("Segment already parsed!");
}
//下面获取了三个输入句柄,分别向三个路径中输出键值对
public RecordWriter<Text, Parse> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress)
 throws IOException {
......
Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // 一个输出路径
 Path data = new Path(new Path(out, ParseData.DIR_NAME), name); //两个输出路径
 Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//三个输出路径
    
//一个写入
final MapFile.Writer textOut =
new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class,
CompressionType.RECORD, progress);
//第二个写入
final MapFile.Writer dataOut =
new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,
compType, progress);
//第三个写入
final SequenceFile.Writer crawlOut =
SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
compType, progress);
return new RecordWriter<Text, Parse>() {
public void write(Text key, Parse parse)throws IOException {
......
crawlOut.append(key, d);
.......
crawlOut.append(new Text(newUrl), newDatum);
......
crawlOut.append(key, adjust);
......
dataOut.append(key, parseData);
......
crawlOut.append(key, datum);
}
}
//关闭三个句柄
public void close(Reporter reporter) throws IOException {
textOut.close();
dataOut.close();
crawlOut.close();
}
};
}
}
  ParseOutputFormat实现了OutputFormat接口,改变了job中设置的输出路径,并且把不同的内容输出到不同的路径,从而达到了多个输出(并且根据逻辑划分)。这个我觉得值得借鉴。
  关于输入以及输入输出的各个实现都有什么用处,以后有机会再来写写。本人现在还是一知半解,见笑了。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-313373-1-1.html 上篇帖子: Hadoop是Apache提出的一个软件框架(即:开放源码并行运算编程工具和分布式文件系统,与MapReduce和Google档案系统的概念类似) 下篇帖子: hadoop shuffle机制中针对中间数据的排序过程详解(源代码级)---转载
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表