心心失意 发表于 2017-12-18 13:26:17

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)

1 package zhouls.bigdata.myMapReduce.TemperatureTest;  

2  
3 import java.io.IOException;
  
4
  
5 import org.apache.hadoop.io.IntWritable;
  
6 import org.apache.hadoop.io.LongWritable;
  
7 import org.apache.hadoop.io.Text;
  
8 import org.apache.hadoop.mapreduce.Mapper;
  
9 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  
10 import org.apache.hadoop.conf.Configuration;
  
11 import org.apache.hadoop.conf.Configured;
  
12 import org.apache.hadoop.fs.FileSystem;
  
13 import org.apache.hadoop.fs.Path;
  
14 import org.apache.hadoop.io.IntWritable;
  
15 import org.apache.hadoop.io.Text;
  
16 import org.apache.hadoop.mapreduce.Job;
  
17 import org.apache.hadoop.mapreduce.Mapper;
  
18 import org.apache.hadoop.mapreduce.Reducer;
  
19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
20 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  
21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
22 import org.apache.hadoop.util.Tool;
  
23 import org.apache.hadoop.util.ToolRunner;
  
24
  
25
  
26 /*
  
27 Hadoop内置的数据类型:
  
28   BooleanWritable:标准布尔型数值
  
29   ByteWritable:单字节数值
  
30   DoubleWritable:双字节数值
  
31   FloatWritable:浮点数
  
32   IntWritable:整型数
  
33   LongWritable:长整型数
  
34   Text:使用UTF8格式存储的文本
  
35   NullWritable:当<key, value>中的key或value为空时使用
  
36 */
  
37
  
38
  
39 /**
  
40* 统计美国每个气象站30年来的平均气温
  
41* 1、编写map()函数
  
42* 2、编写reduce()函数
  
43* 3、编写run()执行方法,负责运行MapReduce作业
  
44* 4、在main()方法中运行程序
  
45*
  
46* @author zhouls
  
47*
  
48*/
  
49                         //继承Configured类,实现Tool接口

  
50 public>
  
51   public static>  
52                                 //输入的key,输入的value,输出的key,输出的value
  
53         //输入的LongWritable键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。
  
54         
  
55 //      在这种情况下,我们将气象站id按 Text 对象进行读/写(因为我们把气象站id当作键),将气温值封装在 IntWritale 类型中。只有气温数据不缺失,这些数据才会被写入输出记录中。
  
56         
  
57         
  
58 //      map 函数的功能仅限于提取气象站和气温信息
  
59         
  
60         /**
  
61          * @function Mapper 解析气象站数据
  
62          * @input key=偏移量value=气象站数据
  
63          * @output key=weatherStationId value=temperature
  
64          */
  
65         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
  
66                                 //map()函数还提供了context实例,用于键值对的输出或者说 map() 方法还提供了 Context 实例用于输出内容的写入
  
67            
  
68 //                              就本示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是气象站id,输出值是气温(整数)。
  
69 //                同时context作为了map和reduce执行中各个函数的一个桥梁,这个设计和Java web中的session对象、application对象很相似
  
70            
  
71            
  
72             //第一步,我们将每行气象站数据转换为每行的String类型
  
73             String line = value.toString(); //每行气象数据
  
74 //            values是1980 12 01 00    78   -17 10237   180    21   1   0   0
  
75 //            line是"1980 12 01 00    78   -17 10237   180    21   1   0   0"
  
76            
  
77            
  
78             //第二步:提取气温值
  
79             int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小时气温值
  
80                                  //需要转换为整形,截取第14位到19位,从第0位开始,trim()的功能是去掉首尾空格。
  
81                                 //substring()方法截取我们业务需要的值
  
82            
  
83 //            substring(start, stop)其内容是从 start 处到 stop-1 处的所有字符,其长度为 stop 减 start。
  
84            
  
85 //            如Hello world!    若是substring(3,7)      则是lo w
  
86            
  
87 //                              Integer.parseInt() 返回的是一个int的值。在这里, 给temperature
  
88            
  
89 //             new Integer.valueof()返回的是Integer的对象。
  
90 //             Integer.parseInt() 返回的是一个int的值。
  
91 //             new Integer.valueof().intValue();返回的也是一个int的值。
  
92            
  
93            
  
94            
  
95            
  
96            
  
97 //            1980 12 01 00    78   -17 10237   180    21   1   0   0
  
98                           //78是气温值
  
99            
  
100 //            temperature是78
  
101            
  
102 //            30yr_03103.dat
  
103 //            30yr_03812.dat
  
104 //            30yr_03813.dat
  
105 //            30yr_03816.dat
  
106 //            30yr_03820.dat
  
107 //            30yr_03822.dat
  
108 //            30yr_03856.dat
  
109 //            30yr_03860.dat
  
110 //            30yr_03870.dat
  
111 //            30yr_03872.dat
  
112            
  
113            
  
114 //            (0,1985 07 31 02   200    94 10137   220    26   1   0 -9999)
  
115 //            (62,1985 07 31 03   172    94 10142   240   0   0   0 -9999)
  
116 //            (124,1985 07 31 04   156    83 10148   260    10   0   0 -9999)
  
117 //            (186,1985 07 31 05   133    78 -9999   250   0 -9999   0 -9999)
  
118 //            (248,1985 07 31 06   122    72 -9999    90   0 -9999   0   0)
  
119 //            (310,1985 07 31 07   117    67 -9999    60   0 -9999   0 -9999)
  
120 //            (371,1985 07 31 08   111    61 -9999    90   0 -9999   0 -9999)
  
121 //            (434,1985 07 31 09   111    61 -9999    60   5 -9999   0 -9999)
  
122 //            (497,1985 07 31 10   106    67 -9999    80   0 -9999   0 -9999)
  
123 //            (560,1985 07 31 11   100    56 -9999    50   5 -9999   0 -9999)
  
124            
  
125 //            (03103,)
  
126            
  
127 //            根据自己业务需要 , map 函数的功能仅限于提取气象站和气温信息
  
128            
  
129            
  
130 //            1998      #year
  
131 //            03            #month
  
132 //            09            #day
  
133 //            17            #hour
  
134 //            11            #temperature            感兴趣
  
135 //            -100      #dew
  
136 //            10237      #pressure
  
137 //            60            #wind_direction   
  
138 //            72            #wind_speed
  
139 //            0            #sky_condition   
  
140 //            0            #rain_1h
  
141 //            -9999      #rain_6h
  
142            
  
143            
  
144             if (temperature != -9999){//过滤无效数据   
  
145               //第三步:提取气象站编号
  
146               //获取输入分片
  
147               FileSplit fileSplit = (FileSplit) context.getInputSplit();//提取问加你输入分片,并转换类型
  
148 //                  即由InputSplit   ->   FileSplit
  
149               
  
150 //                context.getInputSplit()
  
151 //                (FileSplit) context.getInputSplit()这是强制转换
  
152 //                fileSplit的值是file:/D:/Code/MyEclipseJavaCode/myMapReduce/data/temperature/30yr_03870.dat:0+16357956
  
153 //                即,读的是30yr_03870.dat这个文件
  
154               
  
155               
  
156               //然后通过文件名称提取气象站编号
  
157               String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id
  
158                         //首先通过文件分片fileSplit来获取文件路径,然后再获取文件名字,然后截取第5位到第10位就可以得到气象站 编号
  
159 //                        fileSplit.getPath()
  
160 //                        fileSplit.getPath().getName()
  
161               
  
162 //                30yr_03870.dat   我们只需获取03870就是气象站编号
  
163
  
164 //                        fileSplit.getPath().getName().substring(5, 10)   //从0开始,即第5个开始截取,到第10个为止,第10个没有拿到,所以为03870
  
165 //                weatherStationId是03870
  
166               
  
167               
  
168               
  
169               context.write(new Text(weatherStationId), new IntWritable(temperature));//写入weatherStationId是k2,temperature是v2
  
170 //                context.write(weatherStationId,temperature);等价    ,但是若是直接这样写会出错,因为,    weatherStationId是String类型,注意与Text类型还是有区别的!      
  
171                         //气象站编号,气温值
  
172             }
  
173         }
  
174   }
  
175
  
176
  
177   

  
178   public static>  
179         private IntWritable result = new IntWritable();//存取结果
  
180               //因为气温是IntWritable类型                     
  
181         public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
  
182 //            Iterable<IntWritable> values是iterable(迭代器)变量
  
183            
  
184            
  
185 //            Iterable<IntWritable> values和IntWritable values这样有什么区别?
  
186 //            前者是iterable(迭代器)变量,后者是intwriteable(int的封装)变量
  
187            
  
188            
  
189 //            Iterable<IntWritable> values
  
190 //            迭代器,valuses是 iterable(迭代器)变量,类型是IntWritable
  
191            
  
192            
  
193                         //reduce输出的key,key的集合,context的实例
  
194             //第一步:统计相同气象站的所有气温
  
195             int sum = 0;
  
196             int count = 0;
  
197             for (IntWritable val : values) //星型for循环来循环同一个气象站的所有气温值,即将values的值一一传给IntWritable val
  
198 //                IntWritable val是IntWritable(int的封装)变量
  
199               
  
200             {//对所有气温值累加
  
201             sum += val.get();//去val里拿一个值,就sum下
  
202
  
203 //            val.get()去拿值
  
204            
  
205               count++;
  
206             }
  
207             result.set(sum / count);//设为v3
  
208 //            result.set(sum / count)去设置,将sum / count的值,设给result
  
209 //            sum是21299119   count是258616=82.3580869
  
210            
  
211            
  
212             context.write(key,result);//写入key是k3,result是v3
  
213         }
  
214   }
  
215
  
216   
  
217   
  
218   public int run(String[] args) throws Exception{
  
219         // TODO Auto-generated method stub
  
220         //第一步:读取配置文件
  
221         Configuration conf = new Configuration();//程序里,只需写这么一句话,就会加载到hadoop的配置文件了
  
222         //Configuration类代表作业的配置,该类会加载mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。
  
223         
  
224 //                new Configuration()
  
225
  
226         //第二步:输出路径存在就先删除
  
227         Path mypath = new Path(args);//定义输出路径的Path对象,mypath
  
228         
  
229         
  
230 //      new Path(args)将args的值,给mypath
  
231         
  
232         FileSystem hdfs = mypath.getFileSystem(conf);//程序里,只需写这么一句话,就可以获取到文件系统了。
  
233         //FileSystem里面包括很多系统,不局限于hdfs,是因为,程序读到conf,哦,原来是hadoop集群啊。这时,才认知到是hdfs
  
234         
  
235         if (hdfs.isDirectory(mypath))//如果输出路径存在
  
236         {
  
237             hdfs.delete(mypath, true);//则就删除
  
238         }
  
239         //第三步:构建job对象
  
240         Job job = new Job(conf, "temperature");//新建一个任务,job名字是tempreature
  
241         
  
242 //      new Job(conf, "temperature")有这么个构造方法
  
243         
  
244         job.setJarByClass(Temperature.class);// 设置主类
  
245         //通过job对象来设置主类Temperature.class
  
246         
  
247         //第四步:指定数据的输入路径和输出路径
  
248         FileInputFormat.addInputPath(job, new Path(args));// 输入路径,args
  
249         FileOutputFormat.setOutputPath(job, new Path(args));// 输出路径,args
  
250         
  
251         //第五步:指定Mapper和Reducer
  
252         job.setMapperClass(TemperatureMapper.class);// Mapper
  
253         job.setReducerClass(TemperatureReducer.class);// Reducer
  
254         
  
255         //第六步:设置map函数和reducer函数的输出类型
  
256         job.setOutputKeyClass(Text.class);
  
257         job.setOutputValueClass(IntWritable.class);   
  
258         
  
259         //第七步:提交作业
  
260         return job.waitForCompletion(true)?0:1;//提交任务
  
261   }
  
262
  
263
  
264   /**
  
265      * @function main 方法
  
266      * @param args
  
267      * @throws Exception
  
268      */
  
269   public static void main(String[] args) throws Exception {
  
270         //第一步
  
271 //      String[] args0 =
  
272 //            {
  
273 //            "hdfs://djt002:9000/inputData/temperature/",
  
274 //                  "hdfs://djt002:9000/outData/temperature/"
  
275 //            };
  
276         
  
277         String[] args0 = {"./data/temperature/","./out/temperature/"};
  
278         
  
279 //      args0是输入路径和输出路径的属组
  
280         
  
281         //第二步
  
282         int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
  
283         
  
284 //      ToolRunner.run(new Configuration(), new Temperature(), args0)有这么一个构造方法
  
285         
  
286         //第一个参数是读取配置文件,第二个参数是主类Temperature,第三个参数是输入路径和输出路径的属组
  
287         System.exit(ec);
  
288   }
  
289
  
290 }
  
291
页: [1]
查看完整版本: Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)