buser 发表于 2017-12-18 15:01:40

Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和多种输出格式分析(三十一)

1 package zhouls.bigdata.myMapReduce.TVPlayCount;  

2  
3 import java.io.IOException;
  
4
  
5 import javax.swing.JComboBox.KeySelectionManager;
  
6
  
7 import org.apache.hadoop.conf.Configuration;
  
8 import org.apache.hadoop.conf.Configured;
  
9 import org.apache.hadoop.fs.FileSystem;
  
10 import org.apache.hadoop.fs.Path;
  
11 import org.apache.hadoop.io.Text;
  
12 import org.apache.hadoop.mapreduce.Job;
  
13 import org.apache.hadoop.mapreduce.Mapper;
  
14 import org.apache.hadoop.mapreduce.Reducer;
  
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
17 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  
19 import org.apache.hadoop.util.Tool;
  
20 import org.apache.hadoop.util.ToolRunner;
  
21
  
22 /**
  
23* @input params 各类网站每天每部电影的点播量、收藏量、 评论数 、反票数和支持数等数据的统计
  
24* @ouput params 分别输出每个网站 每部电视剧总的统计数据相关情况
  
25* @author zhouls
  
26* @function 自定义FileInputFormat 将电视剧的统计数据 根据不同网站以MultipleOutputs 输出到不同的文件夹下
  
27*/

  
28 public>  
29   /**
  
30      * @input Params Text TvPlayData
  
31      * @output Params Text TvPlayData
  
32      * @author zhouls
  
33      * @function 直接输出
  
34      */

  
35   public static>  
36         //k1,v1 k2,v2k1是Text型,v1是TVPlayData型,k2是Text型,v2是TVPlayData型
  
37         
  
38 //      因为,v1我们想要的是, 5个网站的 每天电视剧的 播放量 收藏数 评论数 踩数 赞数。这是hadoop自带类型无法满足到的,所以,我们得自定义,即TVPlayData,专门来写。
  
39 //      里面的数据类型,五个指标放到一块输出的,那么这是什么类型?是不是hadoop默认的,无法满足,所以我们得自定义数据类型。
  
40         
  
41         @Override
  
42         protected void map(Text key, TVPlayData value, Context context)throws IOException, InterruptedException{   
  
43 //            k1,默认情况是行偏移量,当然也可以自定义key,即不是行偏移量,如这里。
  
44
  
45            
  
46 //            key=Text
  
47 //            Key是继承者们    1
  
48            
  
49 //            value=TVPlayData
  
50 //            value是zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@7b992254
  
51 //            value是zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@719349c5,注意,这个每次不一样
  
52            
  
53            
  
54 //            key=Text
  
55 //            继承者们精华版    4
  
56            
  
57 //            value=TVPlayData
  
58 //            zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@7b992254
  
59            
  
60             context.write(key,value);//写入key是k2,value是v2
  
61 //            直接TVPlayData那边自定义好了
  
62            
  
63 //            context.write(new Text(key),new TVPlayData(value));//等价      
  
64            
  
65         }
  
66   }
  
67   
  
68   
  
69   
  
70   /**
  
71      * @input Params Text TvPlayData
  
72      * @output Params Text Text
  
73      * @author zhouls
  
74      * @fuction 统计每部电视剧的 点播数 收藏数等按source输出到不同文件夹下
  
75      */

  
76   public static>  
77         private Text m_key = new Text();
  
78         private Text m_value = new Text();
  
79   
  
80         private MultipleOutputs<Text, Text> mos;//MultipleOutputs<output Key , output Value >是hadoop里的多目录输出或者说将结果输出到多个文件或目录下
  
81 //      MultipleOutputs,这是MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中MultipleOutputs与MultipleOutputFormat的一个整合)。
  
82
  
83 //      Hadoop每一个Reducer产生一个输出文件,文件以part-r-00000,part-r-00001的方式进行命名,
  
84 //      如果需要人为的控制输出文件的命名或者每一个Reducer需要写出多个输出文件时,可以采用MultipleOutputs类来完成,
  
85 //      MultipleOutputs采用输出记录的键值对(output Key和output Value)或者任意字符串来生成输出文件的名字,
  
86 //      文件一般以name-r-nnnnn的格式进行命名,其中name是程序设计的任意名字;nnnnn表示分区号。
  
87         
  
88         
  
89         protected void setup(Context context) throws IOException,InterruptedException{   
  
90             mos = new MultipleOutputs<Text, Text>(context);//强制转换,将context的值,转换为MultipleOutputs<k3 , v3 >类型
  
91         }
  
92
  
93         protected void reduce(Text Key, Iterable<TVPlayData> Values,Context context) throws IOException, InterruptedException{
  
94             int daynumber = 0;
  
95             int collectnumber = 0;
  
96             int commentnumber = 0;
  
97             int againstnumber = 0;
  
98             int supportnumber = 0;
  
99            
  
100            
  
101             //迭代器
  
102             for (TVPlayData tv : Values) {//星型for循环,即把Values的值一一传给TVPlayData tv
  
103               daynumber += tv.getDaynumber();//拿值
  
104               collectnumber += tv.getCollectnumber();
  
105               commentnumber += tv.getCommentnumber();
  
106               againstnumber += tv.getAgainstnumber();
  
107               supportnumber += tv.getSupportnumber();
  
108             }
  
109            
  
110             //24小时第一季    1    258962    124    48    2    10
  
111             //tvnamesource    播放量 收藏数 评论数 踩数 赞数
  
112             //1代表优酷,2代表搜狐,3代表土豆,4代表爱奇艺,5代表迅雷看看
  
113             String[] records = Key.toString().split("\t");//转换成String数组类型
  
114            
  
115             String source = records;// 媒体类别
  
116            
  
117             m_key.set(records);//赋值,将records的值,给,m_key
  
118            
  
119            
  
120             m_value.set(daynumber + "\t" + collectnumber + "\t" + commentnumber
  
121                     + "\t" + againstnumber + "\t" + supportnumber);
  
122             //赋值,将daynumber + "\t" + collectnumber + "\t" + commentnumber+ "\t" + againstnumber + "\t" + supportnumber的值,给m_value
  
123            
  
124            
  
125             if (source.equals("1")) {//判别1代表优酷,2代表搜狐,3代表土豆,4代表爱奇艺,5代表迅雷看看
  
126               mos.write("youku", m_key, m_value);//写入,将"youku", m_key, m_value,写入到mos
  
127             } else if (source.equals("2")) {
  
128               mos.write("souhu", m_key, m_value);//写入
  
129             } else if (source.equals("3")) {
  
130               mos.write("tudou", m_key, m_value);//写入
  
131             } else if (source.equals("4")) {
  
132               mos.write("aiqiyi", m_key, m_value);//写入
  
133             } else if (source.equals("5")) {
  
134               mos.write("xunlei", m_key, m_value);//写入
  
135 //                m_key=Text
  
136 //                m_key是龟岩许浚
  
137 //               
  
138 //                m_value=Text
  
139 //                m_value是740014    0    0    44    84
  
140             }
  
141         }
  
142
  
143         
  
144         protected void cleanup(Context context) throws IOException,InterruptedException{
  
145 //            context=.WrappedReducer$Context
  
146 //            context是org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context@4c566d9b
  
147            
  
148            
  
149             mos.close();//写完后,要关闭
  
150         }
  
151         
  
152         
  
153   }
  
154
  
155   
  
156   public int run(String[] args) throws Exception {
  
157         Configuration conf = new Configuration();// 配置文件对象
  
158         Path mypath = new Path(args);
  
159         FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
  
160         if (hdfs.isDirectory(mypath)) {
  
161             hdfs.delete(mypath, true);
  
162         }
  
163
  
164         Job job = new Job(conf, "tvplay");// 构造任务
  
165         job.setJarByClass(TVPlayCount.class);// 设置主类
  
166
  
167         job.setMapperClass(TVPlayMapper.class);// 设置Mapper
  
168         job.setMapOutputKeyClass(Text.class);// key输出类型
  
169         job.setMapOutputValueClass(TVPlayData.class);// value输出类型
  
170         job.setInputFormatClass(TVPlayInputFormat.class);//自定义输入格式
  
171
  
172         job.setReducerClass(TVPlayReducer.class);// 设置Reducer
  
173         job.setOutputKeyClass(Text.class);// reduce key类型
  
174         job.setOutputValueClass(Text.class);// reduce value类型
  
175         // 自定义文件输出格式
  
176         MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class,
  
177               Text.class, Text.class);
  
178         MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class,
  
179               Text.class, Text.class);
  
180         MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class,
  
181               Text.class, Text.class);
  
182         MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class,
  
183               Text.class, Text.class);
  
184         MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class,
  
185               Text.class, Text.class);
  
186         
  
187         FileInputFormat.addInputPath(job, new Path(args));// 输入路径
  
188         FileOutputFormat.setOutputPath(job, new Path(args));// 输出路径
  
189         job.waitForCompletion(true);
  
190         return 0;
  
191   }
  
192   public static void main(String[] args) throws Exception {
  
193         
  
194         //集群模式下
  
195         String[] args0 = { "hdfs://HadoopMaster:9000/inputData/fiveTvWebsiteDlay/tvplay.txt",
  
196               "hdfs://HadoopMaster:9000/outData/fiveTvWebsiteDlay/" };
  
197         
  
198         //本地模式下
  
199 //      String[] args0 = { "./data/tvplay/tvplay.txt",
  
200 //                            "./out/tvplay" };
  
201         
  
202         
  
203         int ec = ToolRunner.run(new Configuration(), new TVPlayCount(), args0);
  
204         System.exit(ec);
  
205   }
  
206 }
  
207
页: [1]
查看完整版本: Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和多种输出格式分析(三十一)