//限制每一次处理无效Blocks的数据节点数量
static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
//限制每一次处理冗余Blocks的数据节点数量
static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
// initialize data structure for the return value
//1 初始化三级数组
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);
for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>());
}
synchronized(neededReplications) {
if (neededReplications.size() == 0) {
missingBlocksInCurIter = 0;
missingBlocksInPrevIter = 0;
return blocksToReplicate;
}
//2.跳过已经处理的数据 replIndex为游标
// Go through all blocks that need replications.
BlockIterator neededReplicationsIterator = neededReplications.iterator();
// skip to the first unprocessed block, which is at replIndex
for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
// # of blocks to process equals either twice the number of live
// data-nodes or the number of under-replicated blocks whichever is less
//要取的block 和 neededReplications 数量 取较小值
blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
//3.迭代获取需要冗余的Block,根据优先级放入不同的数据
for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
if( ! neededReplicationsIterator.hasNext()) {
// start from the beginning
replIndex = 0;
missingBlocksInPrevIter = missingBlocksInCurIter;
missingBlocksInCurIter = 0;
blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
if(blkCnt >= blocksToProcess)
break;
neededReplicationsIterator = neededReplications.iterator();
assert neededReplicationsIterator.hasNext() :
"neededReplications should not be empty.";
}
Block block = neededReplicationsIterator.next();
int priority = neededReplicationsIterator.getPriority();
if (priority < 0 || priority >= blocksToReplicate.size()) {
LOG.warn("Unexpected replication priority: " + priority + " " + block);
} else {
blocksToReplicate.get(priority).add(block);
}
} // end for
} // end synchronized
return blocksToReplicate;
}