glcui 发表于 2016-12-9 10:29:00

Hadoop源代码分析——io.*(二)

  从官方的WordCount中不难看出,Hadoop在读取文本时的至少要有两种分割的功能。即可以将一篇文档按行切割分离,同时可以将一行中的文本依据空格切割出来。这样,类似与Java的I/O操作,只不过是以单词为单位向下迭代。每次迭代时读出一个单词并取出。
  Text word = new Text();然后word.set(itr.nextToken());context.write(word, one) (one是数值为1的整数型变量)。将word逐次编号,并将word的value设置为1表示独到的单词已经出现了一次。最后将全部读取结果保存到Context类型的文本上。
  因此,从对文件读写要求的角度出发,Hadoop在文件系统包下建立了一些关乎I/O的接口。
  org.apache.hadoop.fs.Seekable
  该接口定义了三种方法。注意:以下方法是对任意文件的操作而非仅仅是针对文本的操作。
  1.void seek(long pos)
  从文件的开始处经过指定的步长pos到达新位置,read()指向新位置。步长不能超出文件。
  2.void getPos(long pos)
        返回当前位置的步长。
 
        3.boolean seekToNewSource(long targPos)
        判断在目标位置是否是当前内容的复本。
 
        与此类似org.apache.hadoop.fs.PositioneReadable定义了一些可读的方法.
        public int read(long position, byte[] buffer, int offset, int length) throws IOException;
        将描述翻译成中文是从文件中的一个指定位置position开始读取直到指定字节串byte[]的位置offset,返回读取到的字节个数length。
       貌似是要实现这样一个过程,仅是个人猜测:eg:I wish the wish you wish.
        int length = 0;
        String s = "you";
        bytes = s.toByteArray();
  read(7, bytes, 2, length);
  从I wish the wish you wish.从t到u一共移动了11个字符。运行完毕后length = 11。
  原文档中提到This dos not  change the current offset of a file, and is thread-safe.估计就是用length的增加取代offset的偏移,实现线程安全。仅是个人猜想read的一种实现及运行,原文档没有对构造传入的变量进行描述。
  public void readFully(long positon, byte[] buffer) throws IOException;
  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
  根据文档的描述,其实现的思路和 public int read(long position, byte[] buffer, int offset, int length)是一致的。
  实现了以上两个接口的I/O类:
  org.apache.hadoop.io.compress.CompressionInputStream 
  该类在compress包下。在Hadoop集群中,为了保障通讯、存储的效益和质量,压缩操作的读写过程中时时进行,不难猜测,compress包实现了对I/O的压缩与解压功能。
  提到压缩,应从 org.apache.hadoop.io.compress.Compressor 开始分析。
 
  interface Compressor 的方法模仿java.util.zip.Deflater而来。
  通过实例化Deflater对象,可以对数据进行公有zip算法压缩。
public void justATry() {
            try {
                    String inputString = "blahblahblah??";
                    byte[] input = inputString.getBytes("UTF-8");
                     // Compress the bytes
                    byte[] output = new byte;
                    Deflater compresser = new Deflater();
                    compresser.setInput(input);
                    compresser.finish();
                    int compressedDataLength = compresser.deflate(output);
                    System.out.println(output);
 
                     // Decompress the bytes
                    Inflater decompresser = new Inflater();
                    decompresser.setInput(output, 0, compressedDataLength);
                    byte[] result = new byte;
                    int resultLength = decompresser.inflate(result);
                    decompresser.end();
                    for (int i = 0; i < result.length; i++) {
                            System.out.println(result);
                        }
                    // Decode the bytes into a String
                    String outputString = new String(result, 0, resultLength, "UTF-8");
                    System.out.println(outputString);
                } catch (java.io.UnsupportedEncodingException ex) {
            } catch (java.util.zip.DataFormatException ex) {
        }
        以上方法为Deflater对数据的压缩解压过程。不难分析:interface Compressor  应有类似的public void setInput(byte[] input);public void finish();public int deflate(byte[] output)方法。同理可得:org.apache.hadoop.io.compress.Decompressor 的解压方法。
        因此,可以猜测CompressionInputStream 实现PositionedReadable、Seekable 将数据读入。CompressionOnputStream 实现Compressor 对读入的内容进行压缩,输出压缩后的内容。同时可以猜测压缩输出的框架机制。
页: [1]
查看完整版本: Hadoop源代码分析——io.*(二)