yllplay 发表于 2018-10-30 07:33:10

Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析

public abstract class FileInputFormat extends InputFormat {  
    // Generate the list of files and make them into FileSplits
  
    public List getSplits(JobContext job) throws IOException {
  
      // 1. 通过JobContext中获取List;
  
      // 2. 遍历文件属性数据
  
      //    2.1. 如果是空文件,则初始化一个无主机信息的FileSplits实例;
  
      //    2.2. 非空文件,判断是否分片,默认是分片的
  
      //         如果不分片则每个文件作为一个FileSplit
  
      //         计算分片大小splitSize
  

  
      // getFormatMinSplitSize()返回固定值1
  
      // getMinSplitSize(job)通过Configuration获取,配置参数为(mapred-default.xml):
  
      // mapreduce.input.fileinputformat.split.minsize默认值为0
  
      // minSize的值为1
  
      long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  
      // 实际调用context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
  
      // 通过Configuration获取,配置参数为(mapred-default.xml无该参数):
  
      // mapreduce.input.fileinputformat.split.maxsize
  
      // 未配置该参数,取Long.MAX_VALUE,maxSize的值为Long.MAX_VALUE
  
      long maxSize = getMaxSplitSize(job);
  

  
      // generate splits
  
      List splits = new ArrayList();
  
      List files = listStatus(job);
  
      for (FileStatus file: files) {
  
          Path path = file.getPath();   // 在HDFS上的绝对路径
  
          long length = file.getLen();    // 文件的实际大小
  
          if (length != 0) {
  
            BlockLocation[] blkLocations;
  
            if (file instanceof LocatedFileStatus) {
  
            blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  
            } else {
  
            FileSystem fs = path.getFileSystem(job.getConfiguration());
  
            blkLocations = fs.getFileBlockLocations(file, 0, length);
  
            }
  
            if (isSplitable(job, path)) {
  
            // 这里取的是Block块的大小,在2.6里面默认是134217728(即128M)
  
            long blockSize = file.getBlockSize();
  
            // 获取切片大小,computeSplitSize(blockSize, minSize, maxSize)实际调用:
  
            //          1                Long.MAX_VALUE   128M
  
            // Math.max(minSize, Math.min(maxSize,      blockSize));
  
            // split的大小刚好等于block块的大小,为128M
  
            long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  

  
            long bytesRemaining = length;   // 取文件的实际大小
  
            // 如果文件的实际大小/splitSize > 1.1(即实际大小大于128M * 1.1)
  
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  
                // getBlockIndex判断is the offset inside this block?
  
                // 第一次length-bytesRemaining的值为0,取block块的第一个复本
  
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  
                            blkLocations.getHosts(),
  
                            blkLocations.getCachedHosts()));
  
                bytesRemaining -= splitSize;    // 依次减去分片的大小,对剩余长度再次分片
  
            }
  

  
            /**
  
            * 加入有一个300M的文件,设置bytesRemaining = length = 300M;
  
            * 1、判定bytesRemaining / splitSize = 300 / 128 > 1.1
  
            *makeSplie-->FileSplit(path, length - bytesRemaining = 0, splitSize=128M)
  
            *bytesRemaining -= splitSize => bytesRemaining = 172M
  
            * 2、判定bytesRemaining / splitSize = 172 / 128 > 1.1
  
            *makeSplie-->FileSplit(path, length - bytesRemaining = 128, splitSize=128M)
  
            *bytesRemaining -= splitSize => bytesRemaining = 44M
  
            * 3、判定bytesRemaining / splitSize = 44 / 128 < 1.1
  
            *while循环结束。
  
            */
  

  
            // 多次分片后,最后的数据长度仍不为0但又不足一个分片大小
  
            if (bytesRemaining != 0) {
  
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  
                           blkLocations.getHosts(),
  
                           blkLocations.getCachedHosts()));
  
                // 在这里把最后的44M又make了一个分片
  
                // makeSplie-->FileSplit(path, length - bytesRemaining = 256, splitSize=44)
  
            }
  
            } else { // not splitable,就取实际大小
  
            splits.add(makeSplit(path, 0, length, blkLocations.getHosts(),
  
                        blkLocations.getCachedHosts()));
  
            }
  
          } else {
  
            //Create empty hosts array for zero length files
  
            splits.add(makeSplit(path, 0, length, new String));
  
          }
  
      }
  
      // Save the number of input files for metrics/loadgen
  
      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  

  
      return splits;
  
    }
  
}


页: [1]
查看完整版本: Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析