设为首页 收藏本站
查看: 952|回复: 0

[经验分享] hadoop mr数据流总结

[复制链接]

尚未签到

发表于 2016-12-8 07:10:02 | 显示全部楼层 |阅读模式
hadoop mr数据流
/*
符注:
()内为数据;[]内为处理;
{}内为框架模块;
()数据若无说明则为在内存;
->本机数据流;=>网络数据流;~>分布式-本地读写数据流;
/**/为标注;
*/
(分布式源文件)~>{JobTracker分配到各TaskTracker本机上}=>
-------------------------------- @TaskTracker(Map) machine
(split)->[InputFormat]->
[RecordReader]迭代begin
(k1,v1)->[map]->
(k2,v2)->[partition]->
[RecordReader]迭代end->
(k2,v2内存集合1)->[sort,merge]->
(k2,v2内存排序集合)->
[combine]迭代begin/*若有partition则按其规则*/
(k2,iter(v2))->[combine]->
(k3,v3)->
[combine]迭代end->
(k3,iter(v3)本地文件)->
-------------------------------- @TaskTracker(Map) machine
[shuffle]=>
-------------------------------- @TaskTracker(Reduce) machine
(k3,iter(v3)来自各mapper的子集)->[sort,merge]->
(k3,iter(v3)来自各mapper的合集)->[reduce]->
(k4,v4)->[OutputFormat]~>
-------------------------------- @TaskTracker(Reduce) machine
(分布式结果文件)

======================================
总结mapreduce数据处理流程:
所谓分布式计算在hadoop的实现里可表达为:
1.基于hdfs分布式存储的各存储节点的map运算过程;
2.之后的在少量(甚至唯一)节点上的reduce运算过程;
3.以及连接map运算输出和reduce输入的shuffle过程;
下图表达在某datanode机器节点进行map和shuffle的过程:
DSC0000.jpg
InputFomat:从block到split到(k1,v1)
hdfs中文件是按照配置大小(默认64M)分block存储n份(一般为)到n个独立datanode节点的;
当要解析某分布式文件,要执行map任务的TaskTracker将读取存储于本机的相关block进行本地文件解析;
个人理解是一个block可以被拆分为多个split,每个split作为一个map tasktracker的输入(但如何切分还没搞清楚),我所涉及的所有mr测试中都是一个block对应一个map任务.
这个split可以通过FSDataInputStream输入流读取,这是一个本地文件读取操作.
此过程中可以编写自己的InputFormat进行自定义的读取,此类功能的核心是返回一个RecordReader,RR是具体解析文件逻辑实现类:
对于hadoop0.20及以前版本,对应hadoop core jar里的 org.apache.hadoop.mapred包,RR我理解是一种被动模式,从其接口函数命名看,主要实现如下函数:
createKey()
createValue()
getPos()
next(Text key, Text value)
这个RecordReader接口具体实现类的调用者,应是先调用createKey和createValue来实例化key,value对象(这里是为初始化自定义对象考虑的),然后再调用next(key,value)来填充这两个对象而得到解析的结果.
对于hadoop1.0及之后的版本,对应hadoop core jar里的 org.apache.hadoop.mapreduce包,RR是一种主动模式,主要实现如下函数:
getCurrentKey()
getCurrentValue()
nextKeyValue()
getPos()
isSplitable(JobContext context, Path file)
此接口调用者,应是直接调用nextKeyValue()来获取key value实例,然后通过两个getCurrent方法由调用者获取其对应实例;
无论是next(Text key, Text value),还是nextKeyValue(),split被RR解析为一条的(k1,v1)
MapClass.map():从(k1,v1)到(k2,v2)
不多说.
Partition.partition():给(k2,v2)盖章(指定reduce)
经过此函数数据流中的k,v是不变的,函数相当于给此k,v对盖个章,指定其要去的reduce.
值得注意的是,经过RR出来的(k1,v1)是先顺序经过map和partition,而非全过完map之后再全过partition.
所有(k2,v2)写入membuffer.
经过InputFormat中RR的next/nextKeyValue迭代,形成的系列(k2,v2)会被写入到内存的buffer中,此buffer大小通过io.sort.mb参数指定.
另为了控制资源,会有另一个进程来监控此buffer容量,当实际容量达到/超过此buffer某百分比时,将发生spill操作,此百分比通过io.sort.spill.percent参数设定.
此监控进程与写入独立,所以不会影响写入速度.
spill:mem buffer中的(k2,v2)落地为local disk的文件
当buffer实际容量超过门槛限制,或者split所有数据经RR next/nextKeyValue迭代完成时,均触发spill操作,所以每个map任务流中至少会有一次spill.
spill作用是将buffer里的kv对sort到本地磁盘文件.
sort:针对spill kv数据的排序
在spill时,sort会参考partition时给数据所盖的章,即reduce号,另在reduce号相同的情况下再按照k2排序.这个排序至少有两个目的:
1.使得在有可能的combine中进行同key的迭代变的很容易;
2.使得shuffle后的reduce大流程中基于同key合并变得相对容易;
combine:从(k2,iter(v2))到(k3,v3)
combine类建立的初衷是尽量减少spill发生的磁盘写操作的量(拿网上通用的说法就是本地的reduce,也确实是实现了Reduce的接口/父类)
combine基于业务规则处理key及同key的所有value.这(k2,iter(v2))经处理后,仅返回一个(k3,v3),减少了spill磁盘写入量.
combine很有意思,特别是当它跟partition联合来使用时,在此我在理解上了也颇费了翻功夫:
partition给(k2,v2)盖章,决定了这个(k2,v2)具体要到哪个reduce去.而其后的combine处理,我完全随便写逻辑来处理同key的(k2,iter(v2)),大家是否也有跟我一样的疑问:假设我在partition里是根据value的某种规则来决定reduce号,但到了combine中,某个(k2,v2)却要跟其他很多(k2,v2')进行处理,而其他(k2,v2')并不一定会在partition根据规则归到同一个reduce号上,那combine却笼统的返回了一个(k3,v3),那么这个(k3,v3)到底是会给哪个reduce呢?
我理解hadoop在这里的机制是,在spill时,首先按照partition的reduce号来将分到同reduce的(k2,v2)放到一起,在此前提下,再进行同key的sort,最终写入到一个与reduce号相关的spill file中(位于local disk).我还没有去看具体代码,暂时把这个理解写这里.
另对于这个问题,网上有文章(http://langyu.iyunv.com/blog/992916)提及:"Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。"
这位作者文章很精彩,我很赞同以上的说法,但,实际上我认为运用spill的这种特性,可以完成二级(或者多级)统计的功能(见后面一个测试的例子).
merge and sort:将多个map的spill文件按找partition分配的reduce号归并
首先是按照reduce号进行分组,分组内将按照key(即k3)排序.
shuffle:洗牌,不如称之为摸牌
当一个TaskTracker执行完了map任务,他将完成状态汇报给JobTracker.这时(我认为)JobTracker会让分配reduce的TaskTracker来获取这个map生成的文件,每个TaskTracker获取它所执行reduce对应的那份.
merge and sort:reduce端的
当TaskTracker从各map机上取得属于自己的文件后,要执行merge和sort过程,即按照k3进行排序,这样有利于通过一次遍历就可以达到输入(k3,iter(v3))的效果.
reduce:从(k3,iter(v3))到(k4,v4)并写入至分布式文件系统
每个reduce会生成一个分布式文件,放置于任务目录下.


附:一个测试的例子:
...经过map的处理到partition,
partition的输入(k2,v2)其中k2的含义是书籍出版的年份m个(比如,1999/2010/2012/...),v2的含义是书的分类n个(比如,技术/文学/艺术/...)
partition的处理是根据value来映射到reduce,即每中书籍分类让一个reduce处理.
combine的输入(k2,iter(v2)),输出(k3,v3)其中k3=k2;v3=count(iter(v2));
reduce的输入是(k3,iter(v3)),输出(k4,v4),其中k4=k3;v4=sum(iter(v2));
整个mr执行完之后,按书籍分类输出n个文件,每个文件里根据年份输出该年份(该分类)的书籍总数;
达到做二级分类统计的目的

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311071-1-1.html 上篇帖子: hadoop 10 tip(转载) 下篇帖子: hadoop、hbase、zookeeper环境搭建(详细)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表