hadoop计算平均值
combiner是运行在本地的,reduce是收集全部的,比如一个文件很大1G,比如一个文件很大1G,如果你的集群是5台双核的,如果你的集群是5台双核的,这样这16个块会被分到这10个块里面,相当于要2轮,假设是1、2分给1号机,3、4分给2号机,这样1、2求和完了之后会在1号机上运行一次combiner,3、4完了再2号机上运行一次combiner,所有的combiner运行完了,所有的数据会汇集到reduce上做最终处理。输入(数据摘自互联网):
data1:
data2:
程序源代码:
view plaincopy
[*]package org.apache.hadoop.examples;
[*]
[*]import java.io.IOException;
[*]import java.util.StringTokenizer;
[*]
[*]import org.apache.hadoop.conf.Configuration;
[*]import org.apache.hadoop.fs.Path;
[*]import org.apache.hadoop.io.DoubleWritable;
[*]import org.apache.hadoop.io.Text;
[*]import org.apache.hadoop.mapreduce.Job;
[*]import org.apache.hadoop.mapreduce.Mapper;
[*]import org.apache.hadoop.mapreduce.Reducer;
[*]import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
[*]import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
[*]
[*]public class CountAverage {
[*] public static class AverageMapper extends Mapper<Object, Text, Text, Text>{
[*] public void map(Object key, Text value, Context context)
[*] throws IOException, InterruptedException{
[*] String inputline=value.toString();
[*] StringTokenizer itr = new StringTokenizer(inputline);
[*] String mapkey="";
[*] String mapvalue="";
[*] int count=0;
[*] while (itr.hasMoreTokens()) {
[*] if(count>0){
[*] mapvalue=itr.nextToken();
[*] continue;
[*] }
[*] mapkey=itr.nextToken();
[*] count++;
[*] }
[*] context.write(new Text(mapkey),new Text(mapvalue));
[*] }
[*] }
[*]
[*] public static class AverageCombiner extends Reducer<Text,Text,Text,Text> {
[*] public void reduce(Text key, Iterable<Text> values, Context context)
[*] throws IOException, InterruptedException {
[*] Double sum=0.00;
[*] int count=0;
[*] for(Text t:values){
[*] sum=sum+Double.parseDouble(t.toString());
[*] count++;
[*] }
[*] context.write(new Text(key),new Text(sum+"-"+count));
[*] }
[*] }
[*]
[*] public static class AverageReducer extends Reducer<Text,Text,Text,DoubleWritable> {
[*] public void reduce(Text key, Iterable<Text> values, Context context)
[*] throws IOException, InterruptedException {
[*] Double sum=0.00;
[*] int count=0;
[*] for(Text t:values){
[*] String[] str=t.toString().split("-");
[*] sum+=Double.parseDouble(str);
[*] count+=Integer.parseInt(str);
[*] }
[*] context.write(new Text(key),new DoubleWritable(sum/count));
[*] }
[*] }
[*]
[*] /**
[*] * @param args
[*] * @throws IOException
[*] * @throws ClassNotFoundException
[*] * @throws InterruptedException
[*] */
[*] public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
[*] // TODO Auto-generated method stub
[*] Configuration conf = new Configuration();
[*] Job job = new Job(conf, "count average");
[*] job.setJarByClass(CountAverage.class);
[*] job.setMapperClass(AverageMapper.class);
[*] job.setCombinerClass(AverageCombiner.class);
[*] job.setReducerClass(AverageReducer.class);
[*] job.setOutputKeyClass(Text.class);
[*] job.setOutputValueClass(Text.class);
[*] FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/pu/input/*"));
[*] FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/pu/output/*"));
[*] System.exit(job.waitForCompletion(true) ? 0 : 1);
[*] }
[*]
[*]}
运行结果:
页:
[1]