|
package com.lch.find;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class SearchMaxValue {
public static class SearchMaxValueMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable>{
private final static IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String str = value.toString().trim();
float flo = Float.parseFloat(str);
context.write(one, new FloatWritable(flo));
}
}
public static class SearchMaxValueReducer extends Reducer<IntWritable, FloatWritable, Text, FloatWritable>{
@SuppressWarnings("rawtypes")
public void reduce(IntWritable key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException{
Iterator it = values.iterator();
float maxFloat = 0, tmp;
if(it.hasNext()){
maxFloat = ((FloatWritable) it.next()).get();
}else{
context.write(new Text("Max Value : "), null);
return;
}
while(it.hasNext()){
tmp = ((FloatWritable) it.next()).get();
if(tmp > maxFloat){
maxFloat = tmp;
}
}
context.write(new Text("Max value is :"), new FloatWritable(maxFloat));
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] paths = { "hdfs://192.168.1.136:9000/tmp/test.txt", "hdfs://192.168.1.136:9000/tmp/outputs" };
String[] otherArgs = new GenericOptionsParser(conf, paths).getRemainingArgs();
Job job = new Job(conf, "SearchValue");
job.setJarByClass(SearchMaxValue.class);
job.setMapperClass(SearchMaxValueMapper.class);
job.setReducerClass(SearchMaxValueReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
代码比较简单 不多解释
附上数据信息
12.5
85.0
236
239
23
2
0
1
9
236
232.0 |
|
|