设为首页 收藏本站
查看: 605|回复: 0

[经验分享] Hadoop源代码分析——io.*(二)

[复制链接]

尚未签到

发表于 2016-12-9 10:29:00 | 显示全部楼层 |阅读模式
  从官方的WordCount中不难看出,Hadoop在读取文本时的至少要有两种分割的功能。即可以将一篇文档按行切割分离,同时可以将一行中的文本依据空格切割出来。这样,类似与Java的I/O操作,只不过是以单词为单位向下迭代。每次迭代时读出一个单词并取出。
  Text word = new Text();然后word.set(itr.nextToken());context.write(word, one) (one是数值为1的整数型变量)。将word逐次编号,并将word的value设置为1表示独到的单词已经出现了一次。最后将全部读取结果保存到Context类型的文本上。
  因此,从对文件读写要求的角度出发,Hadoop在文件系统包下建立了一些关乎I/O的接口。
  org.apache.hadoop.fs.Seekable
  该接口定义了三种方法。注意:以下方法是对任意文件的操作而非仅仅是针对文本的操作。
  1.void seek(long pos)
  从文件的开始处经过指定的步长pos到达新位置,read()指向新位置。步长不能超出文件。
  2.void getPos(long pos)
        返回当前位置的步长。
 
        3.boolean seekToNewSource(long targPos)
        判断在目标位置是否是当前内容的复本。
 
        与此类似org.apache.hadoop.fs.PositioneReadable定义了一些可读的方法.
        public int read(long position, byte[] buffer, int offset, int length) throws IOException;
        将描述翻译成中文是从文件中的一个指定位置position开始读取直到指定字节串byte[]的位置offset,返回读取到的字节个数length。
       貌似是要实现这样一个过程,仅是个人猜测:eg:I wish the wish you wish.
        int length = 0;
        String s = "you";
        bytes = s.toByteArray();
  read(7, bytes, 2, length);
  从I wish the wish you wish.从t到u一共移动了11个字符。运行完毕后length = 11。
  原文档中提到This dos not  change the current offset of a file, and is thread-safe.估计就是用length的增加取代offset的偏移,实现线程安全。仅是个人猜想read的一种实现及运行,原文档没有对构造传入的变量进行描述。
  public void readFully(long positon, byte[] buffer) throws IOException;
  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
  根据文档的描述,其实现的思路和 public int read(long position, byte[] buffer, int offset, int length)是一致的。
  实现了以上两个接口的I/O类:
  org.apache.hadoop.io.compress.CompressionInputStream 
  该类在compress包下。在Hadoop集群中,为了保障通讯、存储的效益和质量,压缩操作的读写过程中时时进行,不难猜测,compress包实现了对I/O的压缩与解压功能。
  提到压缩,应从 org.apache.hadoop.io.compress.Compressor 开始分析。
 
  interface Compressor 的方法模仿java.util.zip.Deflater而来。
  通过实例化Deflater对象,可以对数据进行公有zip算法压缩。
public void justATry() {
            try {
                    String inputString = "blahblahblah??";
                    byte[] input = inputString.getBytes("UTF-8");
                     // Compress the bytes
                    byte[] output = new byte[100];
                    Deflater compresser = new Deflater();
                    compresser.setInput(input);
                    compresser.finish();
                    int compressedDataLength = compresser.deflate(output);
                    System.out.println(output);
 
                     // Decompress the bytes
                    Inflater decompresser = new Inflater();
                    decompresser.setInput(output, 0, compressedDataLength);
                    byte[] result = new byte[100];
                    int resultLength = decompresser.inflate(result);
                    decompresser.end();
                    for (int i = 0; i < result.length; i++) {
                            System.out.println(result);
                        }
                    // Decode the bytes into a String
                    String outputString = new String(result, 0, resultLength, "UTF-8");
                    System.out.println(outputString);
                } catch (java.io.UnsupportedEncodingException ex) {
            } catch (java.util.zip.DataFormatException ex) {
        }
        以上方法为Deflater对数据的压缩解压过程。不难分析:interface Compressor  应有类似的public void setInput(byte[] input);public void finish();public int deflate(byte[] output)方法。同理可得:org.apache.hadoop.io.compress.Decompressor 的解压方法。
        因此,可以猜测CompressionInputStream 实现PositionedReadableSeekable 将数据读入。CompressionOnputStream 实现Compressor 对读入的内容进行压缩,输出压缩后的内容。同时可以猜测压缩输出的框架机制。[size=1em]

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311860-1-1.html 上篇帖子: Hadoop之Cloudera Manager CDH4卸载 下篇帖子: hadoop+hbase+hive日常异常记录
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表