设为首页 收藏本站
查看: 789|回复: 0

[经验分享] Hadoop中CombineFileInputFormat详解

[复制链接]

尚未签到

发表于 2016-12-6 07:00:01 | 显示全部楼层 |阅读模式
  转http://blog.csdn.net/wawmg/article/details/17095125
在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。
所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。
CombineFileInputFormat 原理(网上牛人总结):
 
第一次:将同DN上的所有block生成Split,生成方式:
1.循环nodeToBlocks,获得每个DN上有哪些block
2.循环这些block列表
3.将block从blockToNodes中移除,避免同一个block被包含在多个split中
4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中
5.向临时变量curSplitSize增加block的大小
6.判断curSplitSize是否已经超过了设置的maxSize
a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks
b) 没有超过,继续循环block列表,跳到第2步
7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)
a) 如果允许,执行并添加split信息
b) 如果不被允许,将这些剩余的block归还blockToNodes
8.重置
9.跳到步骤1
 
[java] view plaincopy 



  • // process all nodes and create splits that are local     
  •     // to a node.      
  •     //创建同一个DN上的split     
  •     for (Iterator<Map.Entry<String,      
  •          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();      
  •          iter.hasNext();) {     
  •      
  •       Map.Entry<String, List<OneBlockInfo>> one = iter.next();     
  •       nodes.add(one.getKey());     
  •       List<OneBlockInfo> blocksInNode = one.getValue();     
  •      
  •       // for each block, copy it into validBlocks. Delete it from      
  •       // blockToNodes so that the same block does not appear in      
  •       // two different splits.     
  •       for (OneBlockInfo oneblock : blocksInNode) {     
  •         if (blockToNodes.containsKey(oneblock)) {     
  •           validBlocks.add(oneblock);     
  •           blockToNodes.remove(oneblock);     
  •           curSplitSize += oneblock.length;     
  •      
  •           // if the accumulated split size exceeds the maximum, then      
  •           // create this split.     
  •           if (maxSize != 0 && curSplitSize >= maxSize) {     
  •             // create an input split and add it to the splits array     
  •             //创建这些block合并后的split,并将其split添加到split列表中     
  •             addCreatedSplit(job, splits, nodes, validBlocks);     
  •             //重置     
  •             curSplitSize = 0;     
  •             validBlocks.clear();     
  •           }     
  •         }     
  •       }     
  •       // if there were any blocks left over and their combined size is     
  •       // larger than minSplitNode, then combine them into one split.     
  •       // Otherwise add them back to the unprocessed pool. It is likely      
  •       // that they will be combined with other blocks from the same rack later on.     
  •       //其实这里的注释已经说的很清楚,我再按照我的理解说一下     
  •       /**   
  •        * 这里有几种情况:   
  •        * 1、在这个DN上还有没有被split的block,   
  •        * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),   
  •        * 将把这些block合并成一个split   
  •        * 2、剩余的block的大小还是没有达到,将剩余的这些block   
  •        * 归还给blockToNodes,等以后统一处理   
  •        */     
  •       if (minSizeNode != 0 && curSplitSize >= minSizeNode) {     
  •         // create an input split and add it to the splits array     
  •         addCreatedSplit(job, splits, nodes, validBlocks);     
  •       } else {     
  •         for (OneBlockInfo oneblock : validBlocks) {     
  •           blockToNodes.put(oneblock, oneblock.hosts);     
  •         }     
  •       }     
  •       validBlocks.clear();     
  •       nodes.clear();     
  •       curSplitSize = 0;     
  •     }     

  
第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)
 
 
[java] view plaincopy 



  • // if blocks in a rack are below the specified minimum size, then keep them     
  •     // in 'overflow'. After the processing of all racks is complete, these overflow     
  •     // blocks will be combined into splits.     
  •     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();     
  •     ArrayList<String> racks = new ArrayList<String>();     
  •      
  •     // Process all racks over and over again until there is no more work to do.     
  •     //这里处理的就不再是同一个DN上的block     
  •     //同一个DN上的已经被处理过了(上面的代码),这里是一些     
  •     //还没有被处理的block     
  •     while (blockToNodes.size() > 0) {     
  •      
  •       // Create one split for this rack before moving over to the next rack.      
  •       // Come back to this rack after creating a single split for each of the      
  •       // remaining racks.     
  •       // Process one rack location at a time, Combine all possible blocks that     
  •       // reside on this rack as one split. (constrained by minimum and maximum     
  •       // split size).     
  •      
  •       // iterate over all racks      
  •     //创建同机架的split     
  •       for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =      
  •            rackToBlocks.entrySet().iterator(); iter.hasNext();) {     
  •      
  •         Map.Entry<String, List<OneBlockInfo>> one = iter.next();     
  •         racks.add(one.getKey());     
  •         List<OneBlockInfo> blocks = one.getValue();     
  •      
  •         // for each block, copy it into validBlocks. Delete it from      
  •         // blockToNodes so that the same block does not appear in      
  •         // two different splits.     
  •         boolean createdSplit = false;     
  •         for (OneBlockInfo oneblock : blocks) {     
  •             //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split     
  •           if (blockToNodes.containsKey(oneblock)) {     
  •             validBlocks.add(oneblock);     
  •             blockToNodes.remove(oneblock);     
  •             curSplitSize += oneblock.length;     
  •            
  •             // if the accumulated split size exceeds the maximum, then      
  •             // create this split.     
  •             if (maxSize != 0 && curSplitSize >= maxSize) {     
  •               // create an input split and add it to the splits array     
  •               addCreatedSplit(job, splits, getHosts(racks), validBlocks);     
  •               createdSplit = true;     
  •               break;     
  •             }     
  •           }     
  •         }     
  •      
  •         // if we created a split, then just go to the next rack     
  •         if (createdSplit) {     
  •           curSplitSize = 0;     
  •           validBlocks.clear();     
  •           racks.clear();     
  •           continue;     
  •         }     
  •      
  •         //还有没有被split的block     
  •         //如果这些block的大小大于了同机架的最小split,     
  •         //则创建split     
  •         //否则,将这些block留到后面处理     
  •         if (!validBlocks.isEmpty()) {     
  •           if (minSizeRack != 0 && curSplitSize >= minSizeRack) {     
  •             // if there is a mimimum size specified, then create a single split     
  •             // otherwise, store these blocks into overflow data structure     
  •             addCreatedSplit(job, splits, getHosts(racks), validBlocks);     
  •           } else {     
  •             // There were a few blocks in this rack that remained to be processed.     
  •             // Keep them in 'overflow' block list. These will be combined later.     
  •             overflowBlocks.addAll(validBlocks);     
  •           }     
  •         }     
  •         curSplitSize = 0;     
  •         validBlocks.clear();     
  •         racks.clear();     
  •       }     
  •     }     

  
最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了
 
 
 
源码总结:
 
合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack
将可以合并的block写到同一个split中
  下面是实践代码:
原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。
自定义CombineSequenceFileInputFormat:

 
[java] view plaincopy 



  • package com.hadoop.combineInput;  
  •   
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.mapreduce.InputSplit;  
  • import org.apache.hadoop.mapreduce.RecordReader;  
  • import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  • import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;  
  • import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;  
  • import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  
  •   
  • public class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {  
  •     @SuppressWarnings({ "unchecked""rawtypes" })  
  •     @Override  
  •     public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  
  •         return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class);  
  •     }  
  • }  

  
实现 CombineSequenceFileRecordReader
 
 
[java] view plaincopy 



  • package com.hadoop.combineInput;  
  •   
  •   
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.mapreduce.InputSplit;  
  • import org.apache.hadoop.mapreduce.RecordReader;  
  • import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  • import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  
  • import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  • import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;  
  • import org.apache.hadoop.util.ReflectionUtils;  
  •   
  •   
  • public class CombineSequenceFileRecordReader<K, V> extends RecordReader<K, V> {  
  •     private CombineFileSplit split;  
  •     private TaskAttemptContext context;  
  •     private int index;  
  •     private RecordReader<K, V> rr;  
  •   
  •     @SuppressWarnings("unchecked")  
  •     public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {  
  •         this.index = index;  
  •         this.split = (CombineFileSplit) split;  
  •         this.context = context;  
  •   
  •         this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  
  •     }  
  •   
  •     @SuppressWarnings("unchecked")  
  •     @Override  
  •     public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {  
  •         this.split = (CombineFileSplit) curSplit;  
  •         this.context = curContext;  
  •   
  •         if (null == rr) {  
  •             rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  
  •         }  
  •   
  •         FileSplit fileSplit = new FileSplit(this.split.getPath(index),  
  •                 this.split.getOffset(index), this.split.getLength(index),  
  •                 this.split.getLocations());  
  •           
  •         this.rr.initialize(fileSplit, this.context);  
  •     }  
  •   
  •     @Override  
  •     public float getProgress() throws IOException, InterruptedException {  
  •         return rr.getProgress();  
  •     }  
  •   
  •     @Override  
  •     public void close() throws IOException {  
  •         if (null != rr) {  
  •             rr.close();  
  •             rr = null;  
  •         }  
  •     }  
  •   
  •     @Override  
  •     public K getCurrentKey()  
  •     throws IOException, InterruptedException {  
  •         return rr.getCurrentKey();  
  •     }  
  •   
  •     @Override  
  •     public V getCurrentValue()  
  •     throws IOException, InterruptedException {  
  •         return rr.getCurrentValue();  
  •     }  
  •   
  •     @Override  
  •     public boolean nextKeyValue() throws IOException, InterruptedException {  
  •         return rr.nextKeyValue();  
  •     }  
  • }  

  参考资料:http://sourceforge.net/p/openimaj/code/HEAD/tree/trunk/hadoop/core-hadoop/src/main/java/org/openimaj/hadoop/sequencefile/combine/CombineSequenceFileRecordReader.java
 
main函数比较简单,这里也贴出来下,方便后续自己记忆:
 
[java] view plaincopy 



  • package com.hadoop.combineInput;  
  •   
  • import java.io.IOException;  
  •   
  •   
  • import org.apache.hadoop.conf.Configuration;  
  • import org.apache.hadoop.conf.Configured;  
  • import org.apache.hadoop.fs.Path;  
  •   
  • import org.apache.hadoop.io.BytesWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Job;  
  • import org.apache.hadoop.mapreduce.Mapper;  
  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  • import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
  • import org.apache.hadoop.util.Tool;  
  • import org.apache.hadoop.util.ToolRunner;  
  •   
  • public class MergeFiles extends Configured implements Tool {   
  •     public static class MapClass extends Mapper<BytesWritable, Text, BytesWritable, Text> {  
  •   
  •         public void map(BytesWritable key, Text value, Context context)  
  •                 throws IOException, InterruptedException {  
  •             context.write(key, value);  
  •         }  
  •     } // END: MapClass  
  •   
  •       
  •     public int run(String[] args) throws Exception {  
  •         Configuration conf = new Configuration();  
  •         conf.set("mapred.max.split.size""157286400");  
  •         conf.setBoolean("mapred.output.compress"true);  
  •         Job job = new Job(conf);  
  •         job.setJobName("MergeFiles");  
  •         job.setJarByClass(MergeFiles.class);  
  •   
  •         job.setMapperClass(MapClass.class);  
  •         job.setInputFormatClass(CombineSequenceFileInputFormat.class);  
  •         job.setOutputFormatClass(SequenceFileOutputFormat.class);  
  •         job.setOutputKeyClass(BytesWritable.class);  
  •         job.setOutputValueClass(Text.class);  
  •   
  •         FileInputFormat.addInputPaths(job, args[0]);  
  •         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  •   
  •         job.setNumReduceTasks(0);  
  •   
  •         return job.waitForCompletion(true) ? 0 : 1;  
  •     } // END: run  
  •   
  •     public static void main(String[] args) throws Exception {  
  •         int ret = ToolRunner.run(new MergeFiles(), args);  
  •         System.exit(ret);  
  •     } // END: main  
  • //   

[java] view plaincopy 



  •   

性能测试:70M大小的压缩sequence文件,2000个,转换成是700个压缩sequence文件,平均每个200M(可控),blocksize=256,耗时2分半到3分钟。
 
存在问题:

  • 合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-310118-1-1.html 上篇帖子: hadoop(3)Upgrade to YARN and Installation 下篇帖子: Hadoop 0.23 config differ from 0.20.205
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表