古城堡 发表于 2017-12-18 12:03:02

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

1 package zhouls.bigdata.myMapReduce.Email;  

2  
3 import java.io.IOException;
  
4 import org.apache.hadoop.conf.Configuration;
  
5 import org.apache.hadoop.conf.Configured;
  
6 import org.apache.hadoop.fs.FileSystem;
  
7 import org.apache.hadoop.fs.Path;
  
8 import org.apache.hadoop.io.IntWritable;
  
9 import org.apache.hadoop.io.LongWritable;
  
10 import org.apache.hadoop.io.Text;
  
11 import org.apache.hadoop.mapreduce.Job;
  
12 import org.apache.hadoop.mapreduce.Mapper;
  
13 import org.apache.hadoop.mapreduce.Reducer;
  
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
16 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  
17 import org.apache.hadoop.util.Tool;
  
18 import org.apache.hadoop.util.ToolRunner;
  
19
  
20 //通过MultipleOutputs写到多个文件:参考博客http://www.cnblogs.com/codeOfLife/p/5452902.html
  
21
  
22 //    MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。
  
23 //这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,
  
24 //其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始)。块号保证从不同块(mapper 或 reducer)输出在相同名字情况下不会冲突。
  
25
  
26
  
27

  
28 public>
  
29   public static>  
30         private final static IntWritable one = new IntWritable(1);
  
31
  
32         @Override
  
33         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  
34             context.write(value, one);
  
35         }
  
36   }
  
37
  
38   

  
39   public static>  
40         private IntWritable result = new IntWritable();
  
41         private MultipleOutputs< Text, IntWritable> multipleOutputs;
  
42
  
43         @Override
  
44         protected void setup(Context context) throws IOException ,InterruptedException{
  
45             multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);
  
46         }
  
47         
  
48         protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException {
  
49             int begin = Key.toString().indexOf("@");//indexOf方法返回一个整数值,指出 String 对象内子字符串的开始位置。
  
50             int end = Key.toString().indexOf(".");//indexOf方法返回一个整数值,指出 String 对象内子字符串的开始位置。只不过我们自己写出个end变量而已
  
51 //            Key.toString().indexOf(ch)
  
52 //            Key.toString().indexOf(str)
  
53 //            Key.toString().indexOf(ch, fromIndex)
  
54 //            Key.toString().indexOf(str, fromIndex)
  
55 //            Key.toString().intern()
  
56            
  
57 //            Java中字符串中子串的查找共有四种方法,如下:
  
58 //            1、int indexOf(String str) :返回第一次出现的指定子字符串在此字符串中的索引。
  
59 //            2、int indexOf(String str, int startIndex):从指定的索引处开始,返回第一次出现的指定子字符串在此字符串中的索引。
  
60 //            3、int lastIndexOf(String str) :返回在此字符串中最右边出现的指定子字符串的索引。
  
61 //            4、int lastIndexOf(String str, int startIndex) :从指定的索引处开始向后搜索,返回在此字符串中最后一次出现的指定子字符串的索引。
  
62            
  
63            
  
64             if(begin>=end){
  
65               return;
  
66             }
  
67            
  
68             //获取邮箱类别,比如 qq
  
69             String name = Key.toString().substring(begin+1, end);
  
70 //                        String.subString(start,end)截取的字符串包括起点所在的字符串,不包括终点所在的字符串
  
71            
  
72             int sum = 0;
  
73            
  
74             for (IntWritable value : Values) {
  
75               sum += value.get();
  
76             }
  
77            
  
78             result.set(sum);
  
79             multipleOutputs.write(Key, result, name);
  
80                         //这里,我们用到的是multipleOutputs.write(Text key, IntWritable value, String baseOutputPath);
  
81            
  
82 //            multipleOutputs.write默认有3种构造方法:
  
83 //            multipleOutputs.write(String namedOutput, K key, V value);
  
84 //            multipleOutputs.write(Text key, IntWritable value, String baseOutputPath);
  
85 //            multipleOutputs.write(String namedOutput, K key, V value,StringbaseOutputPath);
  
86               
  
87 //            MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。
  
88 //            这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。
  
89 //             采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,
  
90 //             其中 name 是由程序设定的任意名字,
  
91 //            nnnnn 是一个指明块号的整数(从 0 开始)。
  
92 //             块号保证从不同块(mapper 或 reducer)写的输出在相同名字情况下不会冲突。
  
93            
  
94         }
  
95         
  
96         @Override
  
97         protected void cleanup(Context context) throws IOException ,InterruptedException{
  
98             multipleOutputs.close();
  
99         }
  
100         
  
101   }
  
102
  
103   public int run(String[] args) throws Exception {
  
104         Configuration conf = new Configuration();// 读取配置文件
  
105         
  
106         Path mypath = new Path(args);
  
107         FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
  
108         if (hdfs.isDirectory(mypath)) {
  
109             hdfs.delete(mypath, true);
  
110         }
  
111         Job job = Job.getInstance();// 新建一个任务
  
112         job.setJarByClass(Email.class);// 主类
  
113         
  
114         FileInputFormat.addInputPath(job, new Path(args));// 输入路径
  
115         FileOutputFormat.setOutputPath(job, new Path(args));// 输出路径
  
116
  
117         job.setMapperClass(MailMapper.class);// Mapper
  
118         job.setReducerClass(MailReducer.class);// Reducer
  
119         
  
120         job.setOutputKeyClass(Text.class);// key输出类型
  
121         job.setOutputValueClass(IntWritable.class);// value输出类型
  
122         
  
123         job.waitForCompletion(true);
  
124         return 0;
  
125   }
  
126
  
127   public static void main(String[] args) throws Exception {
  
128         String[] args0 = {
  
129               "hdfs://HadoopMaster:9000/inputData/multipleOutputFormats/mail.txt",
  
130               "hdfs://HadoopMaster:9000/outData/MultipleOutputFormats/" };
  
131         int ec = ToolRunner.run(new Configuration(), new Email(), args0);
  
132         System.exit(ec);
  
133   }
  
134 }
页: [1]
查看完整版本: Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)