hyytaojunming 发表于 2017-12-17 19:36:04

Hadoop HDFS编程 API入门系列之合并小文件到HDFS(三)

1 package com.dajiangtai.Hadoop.HDFS;  

2  
3 import java.io.IOException;
  
4 import java.net.URI;
  
5 import java.net.URISyntaxException;
  
6 import org.apache.hadoop.conf.Configuration;
  
7 import org.apache.hadoop.fs.FSDataInputStream;
  
8 import org.apache.hadoop.fs.FSDataOutputStream;
  
9 import org.apache.hadoop.fs.FileStatus;
  
10 import org.apache.hadoop.fs.FileSystem;
  
11 import org.apache.hadoop.fs.FileUtil;
  
12 import org.apache.hadoop.fs.Path;
  
13 import org.apache.hadoop.fs.PathFilter;
  
14 import org.apache.hadoop.hdfs.DistributedFileSystem;
  
15 import org.apache.hadoop.io.IOUtils;
  
16 /**
  
17* function 合并小文件至 HDFS   ,文件与块大小(比如128M)来比,小的话,称为小文件。是一个相对概念!相对于数据块而言的!
  
18* @author 小讲
  
19*我们利用通配符和PathFilter 对象,将本地多种格式的文件上传至 HDFS文件系统,并过滤掉 txt文本格式以外的文件。
  
20*/

  
21 public>  
22   private static FileSystem fs = null;
  
23   private static FileSystem local = null;
  
24   /**
  
25      * @function main
  
26      * @param args
  
27      * @throws IOException
  
28      * @throws URISyntaxException
  
29      */
  
30   public static void main(String[] args) throws IOException,
  
31             URISyntaxException {
  
32         list();
  
33   }
  
34
  
35   /**
  
36      *
  
37      * @throws IOException
  
38      * @throws URISyntaxException
  
39      */
  
40   public static void list() throws IOException, URISyntaxException {
  
41         // 读取hadoop文件系统的配置
  
42         Configuration conf = new Configuration();
  
43 //      conf=Configuration
  
44 //      conf是Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
  
45         
  
46         //文件系统访问接口
  
47         URI uri = new URI("hdfs://djt002:9000");
  
48 //      uri=URI
  
49 //      uri是hdfs://djt002:9000
  
50         
  
51 //      URL、URI与Path三者的区别
  
52 //      Hadoop文件系统中通过Hadoop Path对象来代表一个文件   
  
53 //      URL(相当于绝对路径)    ->   (文件) ->    URI(相当于相对路径,即代表URL前面的那一部分)
  
54 //      URI:如hdfs://dajiangtai:9000
  
55 //      如,URL.openStream
  
56         
  
57         
  
58         
  
59         //获得FileSystem实例,即HDFS
  
60         fs = FileSystem.get(uri, conf);
  
61 //      fs=DistributedFileSystem
  
62 //      fs是DFS]
  
63         
  
64         //获得FileSystem实例,即Local
  
65         local = FileSystem.getLocal(conf);
  
66 //      local=LocalFileSystem
  
67 //      local是org.apache.hadoop.fs.LocalFileSystem@3ce1b8c5
  
68 //            为什么要获取到Local呢,因为,我们要把本地D盘下data/73目录下的文件要合并后,上传到HDFS里,所以,我们需先获取到Local,再来做合并工作啦!
  
69         
  
70         
  
71 //      18、列出文件或目录内容(主要是存放文件或目录的元数据,即大小,权限,副本,,,)
  
72 //      public FileStatus[] listStatus(Path f) throws IOException
  
73 //      public FileStatus[] listStatus(Path f,PathFilter filter) throws IOException
  
74 //                PathFilter是路径过滤器
  
75 //      public FileStatus[] listStatus(Path[] files) throws IOException
  
76 //      public FileStatus[] listStatus(Path[] files,PathFilter filter)
  
77 //                传送Path数组和路径过滤器
  
78 //               
  
79 //               
  
80 //      19、FileUtil中的stat2Paths(),将一个FileStatus元数据对象数组转换为一个Path对象数组
  
81 //
  
82 //      20、(1)使用通配符来匹配多个目录下的多个文件,也是列出文件或目录内容(主要是存放文件或目录的元数据,即大小,权限,副本,,,)
  
83 //      public FileStatus[] globStatus(Path pathPattern) throws IOException
  
84 //      public FileStatus[] globStatus(Path pathPattern,PathFilter filter) throws IOException
  
85 //                  
  
86 //          (2)PathFilter对象
  
87 //      public interface PathFilter{
  
88 //            boolean accpet(Path path);
  
89 //      }      
  
90         
  
91         
  
92         
  
93         //过滤目录下的 svn 文件,globStatus从第一个参数通配符合到文件,剔除满足第二个参数到结果,因为PathFilter中accept是return!
  
94         FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));//一般这是隐藏文件,所以得排除
  
95         //dirstatus=FileStatus
  
96 //      dirstatus是[DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17; isDirectory=true; modification_time=1427791478002; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
  
97 //      , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18; isDirectory=true; modification_time=1427791505373; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
  
98 //      , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-19; isDirectory=true; modification_time=1427791532277; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
  
99 //      , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-20; isDirectory=true; modification_time=1427791553035; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
  
100 //      , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-21; isDirectory=true; modification_time=1427791577709; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
  
101 //      , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-22; isDirectory=true; modification_time=1427791602770; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
  
102 //      , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-23; isDirectory=true; modification_time=1427791647177; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}]
  
103         
  
104                        
  
105                         //      ^表示匹配我们字符串开始的位置               *代表0到多个字符                        $代表字符串结束的位置
  
106 //      RegexExcludePathFilter来只排除我们不需要的,即svn格式
  
107 //      RegexExcludePathFilter这个方法我们自己写
  
108         
  
109 //      但是我们,最终是要处理文件里的东西,最终是要转成Path类型,因为Path对象f,它对应着一个文件。
  
110         
  
111         //获取73目录下的所有文件路径,注意FIleUtil中stat2Paths()的使用,它将一个FileStatus对象数组转换为Path对象数组。
  
112         Path[] dirs = FileUtil.stat2Paths(dirstatus);//dirstatus是FileStatus数组类型
  
113 //      dirs=Path
  
114 //      dirs是    [file:/D:/data/73/2012-09-17
  
115 //               , file:/D:/data/73/2012-09-18
  
116 //               , file:/D:/data/73/2012-09-19
  
117 //               , file:/D:/data/73/2012-09-20
  
118 //               , file:/D:/data/73/2012-09-21
  
119 //               , file:/D:/data/73/2012-09-22
  
120 //               , file:/D:/data/73/2012-09-23]      
  
121               
  
122         
  
123         FSDataOutputStream out = null;//输出流
  
124 //      out=HdfsDaDataOutputStream
  
125 //      out是org.apache.hadoop.hdfs.client.HdfsDataOutputStream@2b11624e
  
126         
  
127         FSDataInputStream in = null;//输入流
  
128 //      in=ChecksumFileSystem&FSDataBoundedInputStream
  
129 //      in是org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream@526d542f
  
130         
  
131 //      很多人搞不清输入流和输出流,!!!!
  
132 //      其实啊,输入流、输出流都是针对内存的
  
133 //      往内存里写,是输入流。
  
134 //      内存往文件里写,是输出Luis。
  
135 //      
  
136 //      比如一个文件A复制到另一文件B,那么,先写到内存里,再写到文件B。
  
137 //         =>   则文件A写到内存里,叫输入流。
  
138 //         =>    则内存里写到文件B,叫输出流   
  
139         
  
140         
  
141         for (Path dir : dirs) {//for星型循环,即将dirs是Path对象数组,一一传给Path dir
  
142 //            dirs=Path
  
143 //            dirs是[file:/D:/data/73/2012-09-17
  
144 //                  , file:/D:/data/73/2012-09-18
  
145 //                  , file:/D:/data/73/2012-09-19
  
146 //                  , file:/D:/data/73/2012-09-20
  
147 //                  , file:/D:/data/73/2012-09-21
  
148 //                  , file:/D:/data/73/2012-09-22
  
149 //                  , file:/D:/data/73/2012-09-23]   
  
150            
  
151 //      dir= Path   
  
152 //      先传,dir是file:/D:/data/73/2012-09-17
  
153 //      再传,file:/D:/data/73/2012-09-18         
  
154 //      再传,file:/D:/data/73/2012-09-19   
  
155 //      再传,file:/D:/data/73/2012-09-20      
  
156 //      再传,file:/D:/data/73/2012-09-21      
  
157 //      再传,file:/D:/data/73/2012-09-22      
  
158 //      再传,file:/D:/data/73/2012-09-23      
  
159            
  
160             String fileName = dir.getName().replace("-", "");//文件名称
  
161 //                        先获取到如2012-09-17,然后经过replace("-", ""),得到20120917
  
162 //                                                                再获取,20120918
  
163 //                                                                再获取,20120919
  
164 //                                                                再获取,20120920
  
165 //                                                                再获取,20120921
  
166 //                                                                再获取,20120922
  
167 //                                                                再获取,20120923            
  
168            
  
169             //只接受日期目录下的.txt文件,^匹配输入字符串的开始位置,$匹配输入字符串的结束位置,*匹配0个或多个字符。
  
170             FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
  
171 //            先获取到,localStatus=FileStatus
  
172 //                   localStatus是
  
173 //                   再获取到,localStatus=FileStatus
  
174 //            localStatus是
  
175 //            再获取到,,,,不多赘述。
  
176            
  
177            
  
178 //            FileStatus[] localStatus = local.listStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));//试试,看有什么区别?
  
179
  
180 //            如果不设置过滤器,FileInputFormat 会使用一个默认的过滤器来排除隐藏文件。
  
181 //            如果通过调用 setInputPathFilter()设置了过滤器,它会在默认过滤器的基础上进行过滤。换句话说,自定义的过滤器只能看到非隐藏文件。
  
182            
  
183            
  
184                     //RegexAcceptPathFilter这个方法,我们自己写
  
185 //            RegexAcceptPathFilter来只接收我们需要,即txt格式
  
186 //            这里,我们还可以只接收别的格式,自己去改,一定要锻炼学会改别人的代码
  
187            
  
188            
  
189             // 获得如2012-09-17日期目录下的所有文件
  
190             Path[] listedPaths = FileUtil.stat2Paths(localStatus);
  
191 //            同样,但是我们,最终是要处理文件里的东西,最终是要转成Path类型,因为Path对象f,它对应着一个文件。
  
192            
  
193 //            先获取,listedPaths=Path
  
194 //            先获取2012-09-17下的所有,这个不多赘述啦!
  
195            
  
196 //            再获取,listedPaths=Path
  
197 //            listedPaths是[file:/D:/data/73/2012-09-18/ars10767@20120918131500.txt
  
198 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918133000.txt
  
199 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918134500.txt
  
200 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918140000.txt
  
201 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918141500.txt
  
202 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918143000.txt
  
203 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918144500.txt
  
204 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918150000.txt
  
205 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918151500.txt
  
206 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918153000.txt
  
207 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918154500.txt
  
208 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918160000.txt
  
209 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918161500.txt
  
210 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918163000.txt
  
211 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918164500.txt
  
212 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918170000.txt
  
213 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918171500.txt
  
214 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918173000.txt
  
215 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918174500.txt
  
216 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918180000.txt
  
217 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918181500.txt
  
218 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918183000.txt
  
219 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918184500.txt]
  
220            
  
221             //输出路径
  
222             Path block = new Path("hdfs://djt002:9000/outData/MergeSmallFilesToHDFS/"+ fileName + ".txt");
  
223             //fileName是"fileName"
  
224 //            block=Path
  
225 //            block是hdfs://djt002:9000/outData/MergeSmallFilesToHDFS/20120918.txt
  
226            
  
227             // 打开输出流
  
228             out = fs.create(block);//因为,合并小文件之后,比如这是,合并2012-09-17日期目录下的所有小文件,之后,要上传到HDFS里。
  
229 //                类似于,文件A写到内存里,再内存里写到文件B。而这行代码out = fs.create(block);是相当于是,内存里写到文件B。所以是输出流,即是从内存里输出的,所以叫输出流。
  
230 //                            这里,文件A是Local                文件B是HDFS
  
231
  
232 //                                        文件与块大小(比如128M)来比,小的话,称为小文件。是一个相对概念!相对于数据块而言的!
  
233            
  
234 //            很多人搞不清输入流和输出流,!!!!
  
235 //            其实啊,输入流、输出流都是针对内存的
  
236 //            往内存里写,是输入流。
  
237 //            内存往文件里写,是输出Luis。
  
238 //            
  
239 //            比如一个文件A复制到另一文件B,那么,先写到内存里,再写到文件B。
  
240 //               =>   则文件A写到内存里,叫输入流。
  
241 //               =>    则内存里写到文件B,叫输出流   
  
242            
  
243            
  
244             for (Path p : listedPaths) {//for星型循环,即将listedPaths的值一一传给Path p
  
245             //先获取2012-09-17下的所有,这个不多赘述啦!
  
246             //现在,获取到2012-09-18下了
  
247 //            p=Path
  
248 //            p是file:/D:/data/73/2012-09-18/ars10767@20120918134500.txt
  
249 //            得一个一个来,这才叫做一一传给Path p
  
250               
  
251               in = local.open(p);// 打开输入流in
  
252 //                类似于,文件A写到内存里,再内存里写到文件B。而这行代码in = local.open(p);是相当于是,文件A写到内存里。所以是输如流,即是写到内存里的,所以叫输入流。
  
253 //                  这里,文件A是Local                文件B是HDFS
  
254               
  
255               IOUtils.copyBytes(in, out, 4096, false); // 复制数据,IOUtils.copyBytes可以方便地将数据写入到文件,不需要自己去控制缓冲区,也不用自己去循环读取输入源。false表示不自动关闭数据流,那么就手动关闭。
  
256 //                IOUtils.copyBytes这个方法很重要
  
257                                 //是否自动关闭输入流和输出流,若是false,就要单独去关闭。则不在这个方法体里关闭输入和输出流了。
  
258 //                                                   若是true,则在这个方法里关闭输入和输出流。不需单独去关闭了
  
259               
  
260               
  
261 //                明白,IOUtils类的copyBytes将hdfs数据流拷贝到标准输出流System.out中,
  
262 //                copyBytes前两个参数好理解,一个输入,一个输出,第三个是缓存大小,第四个指定拷贝完毕后是否关闭流。
  
263 //                要设置为false,标准输出流不关闭,我们要手动关闭输入流。即,设置为false表示关闭输入流
  
264               
  
265 //                主要是把最后的这个参数定义好, 就可以了。 定义为true还是false,则决定着是否在这个方法体里关闭
  
266 //                若定义为true,则在这个方法体里直接关闭输入流、输出流。不需单独去关闭了
  
267 //                若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了
  
268               
  
269               
  
270               // 关闭输入流
  
271               in.close();//若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了。这就是单独在关闭输入流!!!懂了吗
  
272             }
  
273             if (out != null) {//这里为什么不为空,空指针,则说明里面还有资源。
  
274               // 关闭输出流
  
275               out.close();//若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了。这就是单独在关闭输出流!!!懂了吗
  
276             }
  
277         }
  
278         
  
279   }
  
280
  
281   /**
  
282      *
  
283      * @function 过滤 regex 格式的文件
  
284      *
  
285      */

  
286   public static>  
287         private final String regex;//变量
  
288
  
289         public RegexExcludePathFilter(String regex) {//这个是上面的那个,正在表达式
  
290             this.regex = regex;//将String regex的值,赋给RegexExcludePathFilter类里的private final String regex的值
  
291         }
  
292
  
293         public boolean accept(Path path) {//主要是实现accept方法
  
294             // TODO Auto-generated method stub
  
295             boolean flag = path.toString().matches(regex);//匹配正则表达式,这里是^.*svn$
  
296             return !flag;
  
297         }
  
298
  
299   }
  
300
  
301   /**
  
302      *
  
303      * @function 接受 regex 格式的文件
  
304      *
  
305      */

  
306   public static>  
307         private final String regex;//变量
  
308
  
309         public RegexAcceptPathFilter(String regex) {//这个是上面的那个,正在表达式
  
310             this.regex = regex;//将String regex的值,赋给RegexAcceptPathFilter类里的private final String regex的值
  
311         }
  
312
  
313         public boolean accept(Path path) {//主要是实现accept方法
  
314             // TODO Auto-generated method stub
  
315             boolean flag = path.toString().matches(regex);//匹配正则表达式,这里是^.*txt$
  
316             return flag;
  
317         }
  
318
  
319   }
  
320 }
页: [1]
查看完整版本: Hadoop HDFS编程 API入门系列之合并小文件到HDFS(三)