njsuntop 发表于 2018-10-28 11:37:51

Hadoop运维记录系列(二十四)

public synchronized List chooseUnderReplicatedBlocks(  
int blocksToProcess) {
  
// initialize data structure for the return value
  
List blocksToReplicate = new ArrayList(LEVEL);
  
for (int i = 0; i < LEVEL; i++) {
  
    blocksToReplicate.add(new ArrayList());
  
}
  

  
if (size() == 0) { // There are no blocks to collect.
  
    return blocksToReplicate;
  
}
  
int blockCount = 0;
  
for (int priority = 0; priority < LEVEL; priority++) {
  
    // Go through all blocks that need replications with current priority.
  
    BlockIterator neededReplicationsIterator = iterator(priority);
  
    Integer replIndex = priorityToReplIdx.get(priority);
  
    // skip to the first unprocessed block, which is at replIndex
  
    for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
  
      neededReplicationsIterator.next();
  
    }
  

  
    blocksToProcess = Math.min(blocksToProcess, size());
  
    if (blockCount == blocksToProcess) {
  
      break;// break if already expected blocks are obtained
  
    }
  
    // Loop through all remaining blocks in the list.
  
    while (blockCount < blocksToProcess
  
      && neededReplicationsIterator.hasNext()) {
  
      Block block = neededReplicationsIterator.next();
  
      blocksToReplicate.get(priority).add(block);
  
      replIndex++;
  
      blockCount++;
  
    }
  
    if (!neededReplicationsIterator.hasNext()
  
      && neededReplicationsIterator.getPriority() == LEVEL - 1) {
  
      // reset all priorities replication index to 0 because there is no
  
      // recently added blocks in any list.
  
      for (int i = 0; i < LEVEL; i++) {
  
      priorityToReplIdx.put(i, 0);
  
      }
  
      break;
  
    }
  
    priorityToReplIdx.put(priority, replIndex);
  
}
  
return blocksToReplicate;
  
}


页: [1]
查看完整版本: Hadoop运维记录系列(二十四)