第七章:小朱笔记hadoop之源码分析-hdfs分析 第九节:block Recovery过程分析
第七章:小朱笔记hadoop之源码分析-hdfs分析第九节:block Recovery过程分析
Lease Recovery Algorithm lease recovery算法:
1) Namenode retrieves lease information
name node查找到lease的信息
2) For each file f in the lease, consider the last block b off
对于lease中的每一个文件,获取其最后一个block b进行以下处理
2.1)Get the datanodes which contains b
获取包含block b 的全部data node
2.2)Assign one of the datanodes as the primary datanode p
获取一个data node作为其primary data node
2.3)p obtains a new generation stamp form the namenode
primary datanode向 nn获取一个新的generation stamp
2.4)p get the block info from each datanode
primary datanode 向每一个dn获取block info
2.5)p computes the minimum block length
primary datanode 计算最小块的长度
2.6) p updates the datanodes, which have a valid generation stamp,with the new generation stamp and the minimum block length
primary datanode 用最小块的长度和新生成的 genetation stamp来更新 dn
2.7)p acknowledges the namenode the update results
primary datanodeack nn update的结果
2.8)Namenode updates the BlockInfo
nn 更新 block info
2.9)Namenode removes f from the lease and removes the lease once all files have been removed
nn 删除文件 f 的lease
2.10) Namenode commit changes to edit log
nn向edit log提交 lease 这个change
一个client在持有某个文件的Lease情况下,如果写入数据过程中发生宕机,或者其他事故,导致无法继续对文件进行写入。由于该文件的Lease是由namenode来维护的,此时namenode认为该文件正在被该client持有,所以其他client对该文件是不允许进行写入的。
为了解决上面的问题,namenode中对某个client对应某个文件的Lease是有一个限期的,一旦过了这 个限期,该Lease没有发生任何改变(比如更新时间),没有写入任何数据,那么namenode就认为该lease对应的client发生了异常,需要在 namenode端对这个Lease进行释放,以便其他的client能够对文件进行写入操作。释放Lease的时候会处理正在写入的文件,把该文件的最后一个block和targets datanode数组加入到需要recovery的队列中,进行处理之后等待目标datanode心跳获取该数据。
datanode获取了需要recovery的block的数据,会遍历targets datanode进行recovery操作,recovery结束会按照最小块的长度进行截取、更新块信息和元信息。
第一步:namenode租约检查,准备recovery block 数据
LeaseManager.Monitor.checkLeases 关于租约过程有专门一节讲解。
(1)从namenode内存中找到该filePath对应的文件INode,通常这个时候该INode是一个INodeFileUnderConstruction的实例,表示这个文件是正在被写入,还没有complete的一个文件。
(2)如果该文件的Targets为空,且该文件的block队列为空,表示这个文件是个空文件,那么直接将该文件complete,删除其对应的lease记录,然后返回
(3)如果该文件的Targets为空,且该文件的block队列非空,那么获取该文件的block队列,并找到该队列的最后一个block,将该block的最后一个block对应的 datanode设置为该文件的targets,这个操作的原因在于:由于HDFS的文件只能向最后一个block写入输入,所以lease过期肯定是出 了最后一个block有问题外,其他block应该都是完整的,所以获取最后一个block。而targets表示最后一个block应该保存在哪几个 datanode上,该targets是一个datanode队列,也就是说,namenode知道这最后一个block是在这几台datanode 上,以便向这几个datanode发送block recovery命令
(4)在targets队列中选择一个datanode作为primary datanode,把该文件的最后一个block和targets datanode数组加入到primary datanode的recoverBlocks中。重点关注recoverBlocks这个队列
(5)修改Lease Holder为'NN_Recovery'并续组
//源码一:
if(pendingFile.getTargets()==null||pendingFile.getTargets().length== 0){
if (pendingFile.getBlocks().length == 0) {
finalizeINodeFileUnderConstruction(src, pendingFile);
return;
}
// setup the Inode.targets for the last block from the blocksMap
//最后一个block对应的datanode设置为该文件的targets
Block[] blocks = pendingFile.getBlocks();
Block last = blocks;
DatanodeDescriptor[] targets =new DatanodeDescriptor;
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
for (int i = 0; it != null && it.hasNext(); i++) {
targets = it.next();
}
pendingFile.setTargets(targets);
}
//指定primary datanode,start lease recovery of the last block for this file.
pendingFile.assignPrimaryDatanode();
//再分配Lease 注意Holder NN_Recovery
Lease reassignedLease = reassignLease(lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);
//续约
leaseManager.renewLease(reassignedLease);
//源码2:
//将该INodeFileUnderConstruction文件分配给指定的客户端进程,也就是执行租约恢复的操作为该文件初始化租约的恢复的处理(存储选择的主Datanode所激活的块列表) ,该过程会挑选一个目前还活着的DataNode,作为租约的主节点,并把<block,block目标 DataNode数组>加到该DataNode的recoverBlocks队列中;
void assignPrimaryDatanode() {
//assign the first alive datanode as the primary datanode
// 指派第一个活跃的为主Datanode结点
if (targets.length == 0) {
NameNode.stateChangeLog.warn("BLOCK*"
+ " INodeFileUnderConstruction.initLeaseRecovery:"
+ " No blocks found, lease removed.");
}
int previous = primaryNodeIndex;
//primaryNodeIndex初始值是-1,它用来保证每次找到的primary不在同一个位置
// 从索引previous开始查找到一个活跃的Datanode进程
for(int i = 1; i <= targets.length; i++) {
int j = (previous + i)%targets.length;
if (targets.isAlive) { // 保证第j个Datanode处于活跃状态
DatanodeDescriptor primary = targets;
//把该文件的最后一个block和targets datanode数组加入到
//primary datanode 的recoverBlocks中
primary.addBlockToBeRecovered(blocks, targets);
//存储被主Datanode激活的块,实际存储到该Datanode的块队列中
NameNode.stateChangeLog.info("BLOCK* " + blocks + " recovery started, primary=" + primary);
return;
}
}
}
第二步:datanode心跳获取recovery block
在 HDFS中,datanode是会每隔几秒钟向namenode定期的发送心跳,namenode会返回给datanode一个“命令集(cmds)”的,这个命令集就是namenode需要 datanode执行的某些操作,比如
(1)将该datanode上的某个block拷贝到其他datanode上去的DNA_TRANSFER命令
(2)将该datanode上的某个block从物理磁盘上删除的DNA_TRANSFER命令
(3)停止该datanode的DNA_SHUTDOWN命令
(4)对某个block进行block recovery的DNA_RECOVERBLOCK命令
cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
从上可以看出,从heartbeat的返回命令集中,就包括了对某个block进行recovery的命令。所以,datanode某个block进行 recovery操作的动作,实际上是来自namenode的指令。也就是说,namenode认为这个block需要做recovery了,并且这个 block在某几个datanode上保存,那么namenode就会在这几个datanode的heartbeat发送过来后,给这几个 datanode返回指令集,指令集中就包括对这个block进行recovery的指令。于是datanode接受到这个指令后,对block进行数据 本身的recovery操作。
注意: BlockCommand数据结构,它存储了需要执行的操作(action)以及这个操作涉及的block和这个block所对应的所有的datanode。
public class BlockCommand extends DatanodeCommand {
Block blocks[];
DatanodeInfo targets[][];
}
第三步:datanode执行 block recovery
datanode上最终会调用recoverBlock方法,此时closeFile=true。它是recoverLease发起的,此时要关闭文件,并使得这个文件的block在datanode上的信息一致。
Public Daemon recoverBlocks(final Block[] blocks,final DatanodeInfo[][]targets){
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(int i = 0; i < blocks.length; i++) {
try {
logRecoverBlock("NameNode", blocks, targets);
recoverBlock(blocks, false, targets, true);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks, e);
}
}
}
});
d.start();
return d;
}
(1)由于block recovery 是由primary datanode发起,但该recovery操作需要在三个datanode上对该block进行操作(假设文件副本为3),所以primary datanode接收到命令的时候同时还收到了该block的targets datanode数组(其中就包括该datanode自身)
(2)primary datanode遍历targets datanode数组,对每一个datanode,向其发送一个start block recovery的指令。判断本地调用还是远程RPC,如果是其自身,则直接执行该指令。
(3)start block recovery指令会在datanode的磁盘中找到该block的物理块,并确认该block对应的验证信息和meta信息正确,并返回一个BlockRecord对象,表示这个block正在被recovery。
(4)对每个BlockRecord,查看keepLength标志位是否为true,如果为true,则只recovery blocksize 跟 namenode中记录的blocksize一致的block,否则全部都算。并且block的size为BlockRecord最小的。
(5)对每个物理块,一旦真正开始recovery操作,则进行如下操作:在该datanode上找到该block,同时找到这个block对应的meta文件 (每一个block都对应一个meta文件,用来记录该block的验证码等原信息),更新该block的stamp号(表示该block已经被修改过一 次)
(6)如果需要recovery成的block的size 小于实际的block的size,则将实际的block截断成其需要的大小,并更新meta文件和验证信息。
第四步:datanode处理recovery结果
primary datanode调用各个BlockRecord对应的datanode进行Block同步,然后向namenode提交块同步信息。
for(BlockRecord r : syncList) {
try {
r.datanode.updateBlock(r.info.getBlock(), newblock, closeFile);
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newblock + ", datanode=" + r.id + ")", e);
}
}
namenode.commitBlockSynchronization(block,newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,nlist);
(1)updateBlock 更新Block
updateBlock的最外层是一个死循环,循环的结束条件,是没有任何和这个数据块相关的写线程。每次循环,updateBlock都会去调用一个叫tryUpdateBlock的内部方法。tryUpdateBlock发现已经没有线程在写这个块,就会跟新和这个数据块相关的信息,包括元文件和内存中的映射表volumeMap。如果tryUpdateBlock发现还有活跃的线程和该块关联,那么,updateBlock会试图结束该线程,并等在join上等待。
public void updateBlock(Block oldblock, Block newblock) throws IOException {
if (oldblock.getBlockId() != newblock.getBlockId()) {
throw new IOException("Cannot update oldblock (=" + oldblock
+ ") to newblock (=" + newblock + ").");
}
// Protect against a straggler updateblock call moving a block backwards
// in time.
boolean isValidUpdate =
(newblock.getGenerationStamp() > oldblock.getGenerationStamp()) ||
(newblock.getGenerationStamp() == oldblock.getGenerationStamp() &&
newblock.getNumBytes() == oldblock.getNumBytes());
if (!isValidUpdate) {
throw new IOException(
"Cannot update oldblock=" + oldblock +
" to newblock=" + newblock + " since generation stamps must " +
"increase, or else length must not change.");
}
for(;;) {
final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
if (threads == null) {
return;
}
interruptAndJoinThreads(threads);
}
}
/**
* Try to update an old block to a new block.
* If there are ongoing create threads running for the old block,
* the threads will be returned without updating the block.
* 用于将旧块截断成新块(调用truncateBlock),并截断相应的元数据文件,以及更新ongoingCreates、volumeMap。
* @return ongoing create threads if there is any. Otherwise, return null.
*/
private synchronized List<Thread> tryUpdateBlock(Block oldblock, Block newblock) throws IOException {
//check ongoing create threads 获取与此块相关的文件及访问这个块的线程
ArrayList<Thread> activeThreads = getActiveThreads(oldblock);
if (activeThreads != null) {
return activeThreads; //如果近期有对此块进行操作,返回存活的操作线程
}
//近期无对此块操作的线程,就更新块
////获得旧块的文件
//No ongoing create threads is alive.Update block.
File blockFile = findBlockFile(oldblock.getBlockId());
if (blockFile == null) {
throw new IOException("Block " + oldblock + " does not exist.");
}
File oldMetaFile = findMetaFile(blockFile);
long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
// First validate the update
//update generation stamp
//旧块stamp比新块stamp大,不合法
if (oldgs > newblock.getGenerationStamp()) {
throw new IOException("Cannot update block (id=" + newblock.getBlockId()
+ ") generation stamp from " + oldgs
+ " to " + newblock.getGenerationStamp());
}
//update length
//新块的大小大于旧块的大小
if (newblock.getNumBytes() > oldblock.getNumBytes()) {
throw new IOException("Cannot update block file (=" + blockFile
+ ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes());
}
// Now perform the update
//rename meta file to a tmp file
//旧块元数据文件重命名
File tmpMetaFile = new File(oldMetaFile.getParent(),oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());
if (!oldMetaFile.renameTo(tmpMetaFile)){
throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
}
//新块的大小小于旧块的大小 截断旧块和旧块的元数据文件
if (newblock.getNumBytes() < oldblock.getNumBytes()) {
truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
}
//rename the tmp file to the new meta file (with new generation stamp)
File newMetaFile = getMetaFile(blockFile, newblock);
if (!tmpMetaFile.renameTo(newMetaFile)) {
throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
}
updateBlockMap(ongoingCreates, oldblock, newblock);
updateBlockMap(volumeMap, oldblock, newblock);
// paranoia! verify that the contents of the stored block
// matches the block file on disk.
validateBlockMetadata(newblock);
return null;
}
//truncateBlock对旧块blockFile和对应的元数据文件metaFile进行截断,截断后旧块长度为newlen(newlen<oldlen)。
static void truncateBlock(File blockFile, File metaFile,long oldlen, long newlen) throws IOException {
if (newlen == oldlen) {
return;
}
if (newlen > oldlen) {
throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
+ ") to newlen (=" + newlen + ")");
}
if (newlen == 0) {
// Special case for truncating to 0 length, since there's no previous
// chunk.
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
try {
//truncate blockFile
blockRAF.setLength(newlen);
} finally {
blockRAF.close();
}
//update metaFile
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
try {
metaRAF.setLength(BlockMetadataHeader.getHeaderSize());
} finally {
metaRAF.close();
}
return;
}
//由于只是对就块进行截断,所有新块的最后一个校验和字段可能在旧块中不一样,
//所有setLength进行截断后,要读取最后一个校验和字段
DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
int checksumsize = dcs.getChecksumSize();
int bpc = dcs.getBytesPerChecksum();
long newChunkCount = (newlen - 1)/bpc + 1;//校验和的段数
long newmetalen = BlockMetadataHeader.getHeaderSize() + newChunkCount*checksumsize;//新的校验和文件的长度
long lastchunkoffset = (newChunkCount - 1)*bpc;//最后一个校验和字段的偏移位置
int lastchunksize = (int)(newlen - lastchunkoffset); //最后一个校验和的开始位置
byte[] b = new byte;
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");//对旧块进行读取
try {
//truncate blockFile
blockRAF.setLength(newlen);
//read last chunk
blockRAF.seek(lastchunkoffset);
blockRAF.readFully(b, 0, lastchunksize);
} finally {
blockRAF.close();
}
//compute checksum
dcs.update(b, 0, lastchunksize);
dcs.writeValue(b, 0, false);
//update metaFile
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
try {
metaRAF.setLength(newmetalen);
metaRAF.seek(newmetalen - checksumsize);
metaRAF.write(b, 0, checksumsize);
} finally {
metaRAF.close();
}
}
(2) commitBlockSynchronization
参数分别是block,数据块;newgenerationstamp,新的时间戳;newlength,新长度;closeFile,是否关闭文件,deleteblock,是否删除文件;newtargets,新的目标列表。
处理流程:
参数检查,获取对应的文件,记为pendingFile;从BlocksMap中删除老的信息;如果deleteblock为true,从pendingFile删除Block记录;否则,更新Block的信息;如果不关闭文件,那么写日志保存更新,返回;最后如果关闭文件的话,调用finalizeINodeFileUnderConstruction。
页:
[1]