haloi 发表于 2016-12-6 08:08:31

hadoop计算平均值

  combiner是运行在本地的,reduce是收集全部的,比如一个文件很大1G,比如一个文件很大1G,如果你的集群是5台双核的,如果你的集群是5台双核的,这样这16个块会被分到这10个块里面,相当于要2轮,假设是1、2分给1号机,3、4分给2号机,这样1、2求和完了之后会在1号机上运行一次combiner,3、4完了再2号机上运行一次combiner,所有的combiner运行完了,所有的数据会汇集到reduce上做最终处理。
 
 
输入(数据摘自互联网):
data1:

 
data2:
 

程序源代码:
 
 view plaincopy 



[*]package org.apache.hadoop.examples;  
[*]  
[*]import java.io.IOException;  
[*]import java.util.StringTokenizer;  
[*]  
[*]import org.apache.hadoop.conf.Configuration;  
[*]import org.apache.hadoop.fs.Path;  
[*]import org.apache.hadoop.io.DoubleWritable;  
[*]import org.apache.hadoop.io.Text;  
[*]import org.apache.hadoop.mapreduce.Job;  
[*]import org.apache.hadoop.mapreduce.Mapper;  
[*]import org.apache.hadoop.mapreduce.Reducer;  
[*]import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
[*]import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
[*]  
[*]public class CountAverage {  
[*]    public static class AverageMapper extends Mapper<Object, Text, Text, Text>{  
[*]         public void map(Object key, Text value, Context context)   
[*]            throws IOException, InterruptedException{  
[*]             String inputline=value.toString();  
[*]             StringTokenizer itr = new StringTokenizer(inputline);  
[*]             String mapkey="";  
[*]             String mapvalue="";  
[*]             int count=0;  
[*]             while (itr.hasMoreTokens()) {  
[*]                 if(count>0){  
[*]                     mapvalue=itr.nextToken();  
[*]                     continue;  
[*]                 }  
[*]                 mapkey=itr.nextToken();  
[*]                 count++;  
[*]             }  
[*]             context.write(new Text(mapkey),new Text(mapvalue));  
[*]         }  
[*]    }  
[*]      
[*]    public static class AverageCombiner extends Reducer<Text,Text,Text,Text> {  
[*]        public void reduce(Text key, Iterable<Text> values, Context context)  
[*]        throws IOException, InterruptedException {  
[*]            Double sum=0.00;  
[*]            int count=0;  
[*]            for(Text t:values){  
[*]                sum=sum+Double.parseDouble(t.toString());  
[*]                count++;  
[*]            }  
[*]            context.write(new Text(key),new Text(sum+"-"+count));  
[*]        }  
[*]    }  
[*]      
[*]    public static class AverageReducer extends Reducer<Text,Text,Text,DoubleWritable> {  
[*]        public void reduce(Text key, Iterable<Text> values, Context context)  
[*]        throws IOException, InterruptedException {  
[*]            Double sum=0.00;  
[*]            int count=0;  
[*]            for(Text t:values){  
[*]                String[] str=t.toString().split("-");  
[*]                sum+=Double.parseDouble(str);  
[*]                count+=Integer.parseInt(str);  
[*]            }  
[*]            context.write(new Text(key),new DoubleWritable(sum/count));  
[*]        }  
[*]    }  
[*]      
[*]    /** 
[*]     * @param args 
[*]     * @throws IOException  
[*]     * @throws ClassNotFoundException  
[*]     * @throws InterruptedException  
[*]     */  
[*]    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  
[*]        // TODO Auto-generated method stub  
[*]        Configuration conf = new Configuration();  
[*]        Job job = new Job(conf, "count average");  
[*]        job.setJarByClass(CountAverage.class);  
[*]        job.setMapperClass(AverageMapper.class);  
[*]        job.setCombinerClass(AverageCombiner.class);  
[*]        job.setReducerClass(AverageReducer.class);  
[*]        job.setOutputKeyClass(Text.class);  
[*]        job.setOutputValueClass(Text.class);  
[*]        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/pu/input/*"));  
[*]        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/pu/output/*"));  
[*]        System.exit(job.waitForCompletion(true) ? 0 : 1);  
[*]    }  
[*]  
[*]}  

  


 
运行结果:
页: [1]
查看完整版本: hadoop计算平均值