TOUVE 发表于 2016-12-12 09:48:56

Hadoop源码学习系列—— HAR归档Job设计要点

  hadoop版本:CDH3u2
    hadoop 中生成har文件是通过mapred job实现的,这个主要的class是“HadoopArchives.java”,它是tools包里面的一个类。这个类有800多行code,包含map reduce 自定义的inputformat和其它辅助方法,细读起来还需要花点时间。

 一个har job命令行举例:

# 归档"/fc/src/2011/1"这三个文件夹到"/fc/har/2011/"
hadoop archive -archiveName combine.har -p /fc/src/2011/ 10 11 12 /fc/har/2011/

几个重要的知识点:

1. 生成文件列表文件

 举例命令:

sudo -u hdfs hadoop archive -archiveName src.har -p /user/heipark/fc/ /user/heipark
  下面是input path,即"/user/heipark/fc/" 目录结构(红色为文件夹):
  /user/heipark/fc/
  - 0000
  - 1111
  - 2222
  - 3333
  - 4444
      + sub1
  - 5555
          + dir1
  - 7777
  - 8888
      + sub2
  - 6666
  运行命令过程中生成如下文件:"/user/hdfs/.staging/har_93ftj7/_har_src_files"
  har_93ftj7:har为固定前缀,后面为随机字符串
  _har_src_files:固定文件名,它是输入文件的meta文件,算法采用广度优先,内容如下(为了便于阅读,版式上略做调整):

 _har_src_files文件如下:

0 / dir 0000 1111 2222 3333 4444 sub1 sub2   // ① 
1066026506 /0000 file  // ② 
1066026506 /1111 file  // ② 
38268381 /2222 file      // ② 
62016424 /3333 file      // ② 
54855349 /4444 file      // ② 

0 /sub1 dir 5555 dir1
909247 /sub1/5555 file

0 /sub1/dir1 dir 7777 8888
34193078 /sub1/dir1/7777 file
2095606385 /sub1/dir1/8888 file

0 /sub2 dir 6666
95644473 /sub2/6666 file
  ① 这行的信息表示top level path的文件夹和文件。"0":dir的大小都为0;"/":表示当前为输入文件的top level path;"dir":表示当前是一个dir;"1111 2222 3333 4444 sub1 sub2":top level path 下面的文件和文件夹
  ② 这4行表示"top level path"下面的5个文件,第一列为文件大小,第二列为文件名,第三列标识当前是一个文件

2. 如何确定map和reduce task数量

  2.1 map task数量: 

公式:

totalSize / partSize
      totalSize:所有输入文件总大小,这个是在递归文件列表的时候累加得来的。
      partSize: 默认为2GB,它是一个常量,只能通过修改代码变更。

  2.2 reduce 数量
  reduce数量为1。reduce主要作用是汇总map输出的文件meta 信息,并生成index文件。

3. 关于"HArchiveInputFormat"
  它的作用是读取"_har_src_files"内容(input文件列表),并根据split size(totalSize/numMapTask)生成input split。
    这里生成split过程比较巧妙,FileSplit是根据"_har_src_files"的偏移量分割的,简单说就是一行一行读取,并累加读取文件大小,当达到"split size"就输出一个FileSplit,最终将不足split size的文件放到一个split中,代码如下:

new FileSplit(src, startPos, size, (String[]) null)
  src:"_har_src_files"文件
  startPos:一个split开始行的的bytes,这个偏移是_har_src_files文件的bytes偏移
  size:一个split,在_har_src_files文件bytes偏移大小,可以这样理解:startPos + size = endPos

4. 关于Input Path
  因为map task不直接读取input file,而是通过"_har_src_files"读取自己处理的FileSplit中文件,所以map task真正的输入路径是"_har_src_files"所在目录。

FileInputFormat.addInputPath(conf, jobDirectory);
  jobDirectory:conf中的“har.job.dir”,就是"_har_src_files"所在文件夹。

5. 关于Map Task
  Map Task的输入是"_har_src_files"文件中属于当前map处理的行。
  map输出的key是当前文件或者文件夹名称的hash值,value是文件或者文件夹的meta信息。
  如果当前行是"dir",处理逻辑如下:

# "_har_src_files"中一个dir行
# 0 / dir 0000 1111 2222 3333 4444 sub1 sub2

①输出到reduce的信息
/ dir none 0 0 0000 1111 2222 3333 4444 sub1 sub2



  "/": 目前处理的路径
  "dir none 0 0":表示文件类型是dir
  "0000 1111 2222 3333 4444 sub1 sub2":表示"/"路径下包含的文件和路径
  如果当前行为"file",处理逻辑如下:

 

# "_har_src_files"文件行
# 1066026506 /0000 file

① copy 文件"0000"到har文件内"part-X"中

② 输出到reduce信息
/0000 file part-1 0 1066026506
  "/0000":当前处理文件
  "file":当前处理的是一个文件
  "part-1":文件"0000"输出文件名
  "0":文件"0000"在"part-1"中起始偏移bytes
  "1066026506":问"0000"文件大小

6. 关于reduce
  reduce的工作就是生成"_masterindex"、"_index"两个索引文件。
  先说"_index"文件,它直接汇总并输出map端output的value值,如下:

 "_index"文件:

/ dir none 0 0 0000 1111 2222 3333 4444 sub1 sub2
/0000 file part-1 0 1066026506
/1111 file part-1 1066026506 1066026506
/2222 file part-1 2132053012 38268381
/3333 file part-1 2170321393 62016424
/4444 file part-0 0 54855349
/sub1 dir none 0 0 5555 dir1
/sub2 dir none 0 0 6666
/sub1/5555 file part-0 54855349 909247
/sub1/dir1 dir none 0 0 7777 8888
/sub2/6666 file part-2 0 95644473
/sub1/dir1/7777 file part-0 55764596 34193078
/sub1/dir1/8888 file part-0 89957674 2095606385
  再说“_masterindex” ,它是"_index"文件的索引文件。每一行索引一批文件(每批1000个,不足1000个算也算一批)它的第一行为“1”,这个值是version信息,这也许HarFileSystem开发人员打算支持对har文件的更新留下的功能。
  reduce的输入value是经过排序的,又因为key为value中path部分的hash,所以reduce的key值是追加变大的。
  本例"_masterindex":

#文件version 


 #startIndex endIndex startPos indexStream.getPos() 
0 771396412 0 493
  startIndex:第一个文件startIndex(即文件path的hash)。默认值为0,处理玩一批文件后,会把endIndex赋值给startIndex。
       endIndex :最后一个文件的endIndex (即文件path的hash值)
  startPos:本批文件在"_index"文件中bytes偏移值
       indexStream.getPos():本批文件在"_index"中结束bytes值
  -- heipark
页: [1]
查看完整版本: Hadoop源码学习系列—— HAR归档Job设计要点