设为首页 收藏本站
查看: 1649|回复: 0

[经验分享] 第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode-ReplicationMonitor

[复制链接]

尚未签到

发表于 2016-12-13 10:26:00 | 显示全部楼层 |阅读模式
第四节:namenode分析

4.3 namenode 副本监控分析ReplicationMonitor

ReplicationMonitor主要有两个作用:
(1)负责为副本不足的数据块选择source 数据节点,选择冗余的target节点,等待DN节点下次心跳将这些工作带回给相应的DN执行块冗余操作。
(2)将各个数据节点上无效的数据块副本加入无效集合,等待下次心跳将这些工作带回给相应的DataNode执行删除无效块操作。
默认每3s执行一次,可以通过修改dfs.replication.interval来调整执行间隔
(1)computeDatanodeWork
    计算datanode需要处理的replication数量,主要包括当前超时挂起的replication,需要进行复制的replication,计 划处理的replication,损坏的replication。将这些数据记录下来,在下次heartbeat时候通知给datanode处理。
(2)processPendingReplications
    处理超时挂起的replication,将超时的replication加入到需要进行replication操作的队列中
    重要的控制参数:
 

//限制每一次处理无效Blocks的数据节点数量  
static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;   
//限制每一次处理冗余Blocks的数据节点数量  
static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
 

重要的数据结构:
(1)UnderReplicatedBlocks neededReplications
     需要进行复制的数据块。UnderReplicatedBlocks是一个数组,数组的下标是优先级(0的优先级最高,如果数据块只有一个副本,它的优先 级是0), 数组的内容是一个Block集合。UnderReplicatedBlocks提供一些方法,对Block进行增加,修改,查找和删除。类 UnderReplicatedBlocks的一个实例,负责存储需要冗余副本的块集合, 副本可能被冗余1份或多份;在数据块报告的时候,检查到块副本数量不够,就会将数据块加入该实例存储;在ReplicationMonitor中,将 block及其target加入replicateBlocks队列后,将该块从neededReplications中删除;

(2)PendingReplicationBlocks pendingReplications
     描述当前尚未完成块副本复制的块的列表。因为HDFS支持块的流水线复制,当文件系统客户端开始向第一个Datanode复制数据块的时候,会在第一个 Datanode上启动复制进程,将该结点上接收到的(部分),数据块开始向第二个Datanode上传输复制,而第二个Datanode结点又向第三个 Datanode结点进行流水线复制,……,直到满足副本因子要求。所以,在执行流水线复制的过程中,因为这种可并行的复制方式使得某些块副本的复制完成 时间呈现阶梯状,也就是使用一个数据结构来保存这些尚未复制完成的块副本pendingReplications保存了所有正在进行复制的数据块,使用 Map是需要一些附加的信息PendingBlockInfo。这些信息包括时间戳, 用于检测是否已经超时,和现在进行复制的数目numReplicasInProgress。timedOutItems是超时的复制项,超时的复制项在 FSNamesystem的processPendingReplications方法中被删除,并从新复制。timerThread是用于检测复制超时 的线程的句柄,对应的线程是PendingReplicationMonitor的一个实例,它的run方法每隔一段会检查是否有超时的复制项,如果有, 将该数据块加到timedOutItems中。Timeout是run方法的检查间隔,defaultRecheckInterval是缺省值。在 ReplicationMonitor中,又会将timedOutItems中的数据块重新加入neededReplications;
 
 
我们分两个方面一一分析:
(1)块冗余处理机制
          块冗余处理机制分为四个步骤:选择需要冗余的数据块、为数据块选择源节点、为数据块选择目标节点、将块和目标节点加入待冗余的队列。下面一一分析:
第一步:获取一定量需要冗余的数据块
    当DN节点报告块的时候,NN会判断块的冗余是否足够,如果不足可能有几种情况:副本可能1份或者2份,如果副本仅一份,则需要冗余的优先级就相对更高, 则会将该块放入优先级更高的队列,副本已经有2份的块则会放入优先级更低点的队列,编号越低,一共分为3个优先级。
UnderReplicatedBlocks 主要的成员是一个优先队列,List<TreeSet<Block>> priorityQueues,保存副本数没有达到期望值的block。根据当前副本数低于期望值的程度决定优先级,差的越远,优先级越大(0~2,0最 大)。每个优先级别对应一个TreeSet,getPriority获得优先级后决定放入哪个treeset中。
      对于冗余工作,根据DN节点数量不同,每次处理的数据块也会不同,比如当前集群有10个DN 节点,则默认情况下,ReplicationMonitor一个周期只会从neededReplications中取出20个数据块进行处理,之所以 HDFS要这样设计,主要是考虑到如果一次冗余的数据块太多,势必会对DN节点的网络造成比较大的影响,因此根据不同的DN集群规模决定一次冗余的数据块 数量。在具体冗余的过程中,首先按照优先级从优先级队列中拿一部分数据(如前面提及的20)块到replicateBlocks中,并且每次记录下读取的 优先级队列的偏移量,下次从该偏移指针处真正读取块信息。
 
      以上工作均在 chooseUnderReplicatedBlocks中分四步完成:

    synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {  
// initialize data structure for the return value  
//1 初始化三级数组  
List<List<Block>> blocksToReplicate =  new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);  
for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {  
blocksToReplicate.add(new ArrayList<Block>());  
}  
synchronized(neededReplications) {  
if (neededReplications.size() == 0) {  
missingBlocksInCurIter = 0;  
missingBlocksInPrevIter = 0;  
return blocksToReplicate;  
}  
//2.跳过已经处理的数据 replIndex为游标  
// Go through all blocks that need replications.  
BlockIterator neededReplicationsIterator = neededReplications.iterator();  
// skip to the first unprocessed block, which is at replIndex   
for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {  
neededReplicationsIterator.next();  
}  
// # of blocks to process equals either twice the number of live   
// data-nodes or the number of under-replicated blocks whichever is less  
//要取的block 和 neededReplications 数量 取较小值  
blocksToProcess = Math.min(blocksToProcess, neededReplications.size());  
//3.迭代获取需要冗余的Block,根据优先级放入不同的数据  
for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {  
if( ! neededReplicationsIterator.hasNext()) {  
// start from the beginning  
replIndex = 0;  
missingBlocksInPrevIter = missingBlocksInCurIter;  
missingBlocksInCurIter = 0;  
blocksToProcess = Math.min(blocksToProcess, neededReplications.size());  
if(blkCnt >= blocksToProcess)  
break;  
neededReplicationsIterator = neededReplications.iterator();  
assert neededReplicationsIterator.hasNext() :   
"neededReplications should not be empty.";  
}  
Block block = neededReplicationsIterator.next();  
int priority = neededReplicationsIterator.getPriority();  
if (priority < 0 || priority >= blocksToReplicate.size()) {  
LOG.warn("Unexpected replication priority: " + priority + " " + block);  
} else {  
blocksToReplicate.get(priority).add(block);  
}  
} // end for  
} // end synchronized  
return blocksToReplicate;  
}  
 
第二步:选择源节点
 
     

    private DatanodeDescriptor chooseSourceDatanode(Block block,List<DatanodeDescriptor> containingNodes,NumberReplicas numReplicas);  
依赖的数据结构  
(a)块(Block)到其元数据的映射表,元数据信息包括块所属的inode、存储块的Datanode  
BlocksMap blocksMap   
(b)保存损坏(如:校验没通过)的数据块到对应DataNode的关系   
CorruptReplicasMap corruptReplicas   
(c)保存Datanode上有效但需要删除的数据块(StorageID -> TreeSet<Block>)   
Map<String, Collection<Block>> excessReplicateMap   
 
 主要逻辑:
    (a)判断在某DataNode上副本是否损坏,如果损坏,遍历下个DataNode节点;
    (b)判断在某DataNode对于的replicateBlocks中,是否已经有大于等于2个数据块了,如果是这样,遍历下一个DataNode节点,之所以这样实现,防止单DataNode流量过高;
    (c)如果某DataNode节点已经退役,遍历下一个DataNode节点;
    (d)如果某DataNode节点正在退役,将该DataNode作为source,而这也是HDFS设计的目的,这样的节点不会让出现因为读写数据块而 网络拥 塞,因此也不会让网络那么忙,当然即使是正在退役的节点,replicateBlocks中的块数量也不能超过2个(防止网络拥塞);如果有多个节点正在 退役,会选择最后一个退役节点作为该块的source节点;
    (e)如果没有正在退役的节点,随意选择一个DN节点作为source节点;
 
第三步:选择目标节点       目前存储了该块副本的DN节点不会再作为target节点,至于选择target节点的数目,根据需要再冗余的块副本数量而定,比如还需要冗余2个副 本,则会选择2个target节点,选择的方式,依然采用机架感知相关算法。该算法我会专门用一节来详细讲述。
 
第四步:将块和目标节点加入待冗余的队列
srcNode.addBlockToBeReplicated(block, targets);
注意:DatanodeDescriptor中的BlockQueue replicateBlocks = new BlockQueue();
 
(2)块无效处理机制
  这种类型数据块对于集群没有作用,如果不删除,会占用磁盘空间。在几种情况下会产生无效块:
(a)执行删除文件的时候,会首先快速地删除元数据,文件所对应的数据块就变得无效;
(b)Client上传数据块到DataNode的时候,可能由于网络原因等,数据块上传一部分后失败,这些块成了无效块;
(c) 在HDFS做rebalance操作的时候,会将负载较重的DataNode上部分块文件复制到其他负载较轻的DataNode上,数据块复制成功后,负 载较轻的DataNode上的该数据块就成了无效块;处理的无效块数量也是有限制的:数量为活着的dn的32%, 即 nodesToProcess = (int)Math.ceil((double)heartbeats.size() * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100); 在该流程中,也并不是无效块集合中有多少块,NameNode就一次性让DataNode把相应的无效块删除完,而是每次均删除一部分,这样的话不会造成 DataNode因为删除块而带宽,CPU资源等占用过大。
依赖数据结构: Map<String, Collection<Block>> recentInvalidateSets ,保存了每个DataNode上有效,但需要删除的数据块(StorageID >> TreeSet<Block>)。

流程分析:
(a)周期性调用FSNameSystem的computeDatanodeWork方法
(b)computeDatanodeWork调用computeInvalidateWork方法
(c)循环调用invalidateWorkForOneNode方法,直到达到预设值或全部处理完
(d)invalidateWorkForOneNode方法中,从recentInvalidateSets结构中获得第一个datanode节点,找到 对应的DatanodeDescriptro,同时将其所有对应invalidate block遍历出
(e)将遍历出的invalidate block封装调用addBlocksToBeInvalidated方法,在下一次心跳汇报时返回给datanode进行相应处理
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-313669-1-1.html 上篇帖子: 第七章:小朱笔记hadoop之源码分析-hdfs分析 第一节:基本概念 下篇帖子: Hadoop深入学习:MapReduce作业的提交流程和作业的生命周期
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表