设为首页 收藏本站
查看: 552|回复: 0

[经验分享] [笔记]hadoop mapred InputFormat分析

[复制链接]

尚未签到

发表于 2016-12-8 09:54:34 | 显示全部楼层 |阅读模式
Hadoop MapReduce的编程接口层主要有5个可编程组件,分别为InputFormat、Mapper、Partitioner、Reducer和OutputFormat。
InputFormat
主要用于描述输入数据的格式,提供两个功能:
数据切分:将输入数据切分为若干个split(分片),每个split会被分发到一个Map任务中。
记录识别:通过创建RecordReader,使用它将某个split(分片)中的记录(key, value形式)识别出来(Mapper使用split前的初始化),每个记录会作为Mapper中map函数的输入。



public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;

getSplits:
引用
Logically split the set of input files for the job.
Each InputSplit is then assigned to an individual Mapper for processing.
Note: The split is a logical split of the inputs and the input files are not physically split into chunks. For e.g. a split could be <input-file-path, start, offset> tuple. The InputFormat also creates the RecordReader to read the InputSplit.

它只在逻辑上对输入数据进行分片,并不会在磁盘上将其切片分成分片进行存储。InputSplit只记录了分片的元数据信息(起始位置、长度以及所在的节点列表等)。
createRecordReader:
引用
Create a record reader for a given split. The framework will call RecordReader.initialize(InputSplit, TaskAttemptContext) before the split is used.

FileInputFormat的示例:
DSC0000.png
InputFormat (org.apache.hadoop.mapreduce) 子类层次图:
DSC0001.png
TextInputFormat分析

    文件切分算法
    文件切分算法主要决定InputSplit的个数以及每个InputSplit对应的数据段。TextInputFormat继承FileInputFormat,以文件为单位切分生成InputSplit。
    引用
    protected long computeSplitSize(long blockSize, long minSize,
                                      long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
      }

    在计算splitSize中使用了blockSize, minSize, maxSize。
    blockSize:文件在HDFS中存储的block的大小,默认为64MB,通过dfs.block.size设置。
    minSize:InputSplit的最小值,由配置参数mapred.min.split.size设置,默认值为1。
    maxSize:InputSplit的最大值,由配置参数mapred.max.split.size设置,默认值为Long.MAX_VALUE。
    一旦确定splitSize值后,FileInputFormat将文件一次切成大小为splitSize的InputSplit,最后剩下不足splitSize的数据块单独成为一个InputSplit。
         long bytesRemaining = length;
    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
    blkLocations[blkIndex].getHosts()));
    bytesRemaining -= splitSize;
    }
    if (bytesRemaining != 0) {
    splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
    blkLocations[blkLocations.length-1].getHosts()));
    }
    FileSplit
    FileSplit继承InputSplit,包含了InputSplit所在的文件、起始位置、长度以及所在host的列表。
      /** Constructs a split with host information
    *
    * @param file the file name
    * @param start the position of the first byte in the file to process
    * @param length the number of bytes in the file to process
    * @param hosts the list of hosts containing the block, possibly null
    */
    public FileSplit(Path file, long start, long length, String[] hosts) {
    this.file = file;
    this.start = start;
    this.length = length;
    this.hosts = hosts;
    }
    其中hosts的获取是通过InputSplit的所在文件查找(向NameNode)获取文件的所有BlockLocation,并通过InputSplit的起始位置查找对应的blkIndex,然后通过blkIndex获取对应BlockLocation的host信息。
    LineRecordReader
      public RecordReader<LongWritable, Text>
    createRecordReader(InputSplit split,
    TaskAttemptContext context) {
    return new LineRecordReader();
    }
    LineRecordReader继承了RecordReader类,并适配了LineReader类。LineReader类通过构建了buffer字节数组缓冲(缓冲区大小由参数io.file.buffer.size设置,默认为64K),将数据从流中读出(DFSClient.DFSInputStream.read(byte buf[], int off, int len)),当Record跨块时,会重新定位node,并至少再次读取一次(从新定位的node中读取buffer长度的字节数组)
                if (pos > blockEnd) {
    currentNode = blockSeekTo(pos);
    }
    int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
    int result = readBuffer(buf, off, realLen);

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311339-1-1.html 上篇帖子: Chapter 1. Meet Hadoop 下篇帖子: hadoop中的各种排序
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表