(安西) 发表于 2018-10-31 10:48:27

hadoop作业分片处理以及任务本地性分析(源码分析第一篇)

/**  
   * Generate the list of files and make them into FileSplits.
  
   */
  
public List getSplits(JobContext job
  
                                    ) throws IOException {
  
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  
    long maxSize = getMaxSplitSize(job);
  
    // generate splits
  
    List splits = new ArrayList();
  
    Listfiles = listStatus(job);
  
    for (FileStatus file: files) {
  
      Path path = file.getPath();
  
      FileSystem fs = path.getFileSystem(job.getConfiguration());
  
      long length = file.getLen();
  
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
  
      if ((length != 0) && isSplitable(job, path)) {
  
      long blockSize = file.getBlockSize();
  
      long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  
      long bytesRemaining = length;
  
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
  
                                 blkLocations.getHosts()));
  
          bytesRemaining -= splitSize;
  
      }
  

  
      if (bytesRemaining != 0) {
  
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
  
                     blkLocations.getHosts()));
  
      }
  
      } else if (length != 0) {
  
      splits.add(new FileSplit(path, 0, length, blkLocations.getHosts()));
  
      } else {
  
      //Create empty hosts array for zero length files
  
      splits.add(new FileSplit(path, 0, length, new String));
  
      }
  
    }
  

  
    // Save the number of input files in the job-conf
  
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  
    LOG.debug("Total # of splits: " + splits.size());
  
    return splits;
  
}


页: [1]
查看完整版本: hadoop作业分片处理以及任务本地性分析(源码分析第一篇)