网络浪子 发表于 2018-10-29 12:59:58

Hadoop中WordCount代码-直接加载hadoop的配置文件

package com.apache.hadoop.function;  

  
import java.io.IOException;
  
import java.util.Iterator;
  
import java.util.StringTokenizer;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.IntWritable;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapred.FileInputFormat;
  
import org.apache.hadoop.mapred.FileOutputFormat;
  
import org.apache.hadoop.mapred.JobClient;
  
import org.apache.hadoop.mapred.JobConf;
  
import org.apache.hadoop.mapred.MapReduceBase;
  
import org.apache.hadoop.mapred.Mapper;
  
import org.apache.hadoop.mapred.OutputCollector;
  
import org.apache.hadoop.mapred.Reducer;
  
import org.apache.hadoop.mapred.Reporter;
  
import org.apache.hadoop.mapred.TextInputFormat;
  
import org.apache.hadoop.mapred.TextOutputFormat;
  
import org.apache.hadoop.mapreduce.Mapper.Context;
  

  
public class WordCount {
  public static class WordCountMapper extends MapReduceBase implements
  Mapper {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  

  @Override
  public void map(Object key, Text value,
  OutputCollector output, Reporter reporter)
  throws IOException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
  word.set(itr.nextToken());
  output.collect(word, one);
  }
  }
  }
  

  public static class WordCountReducer extends MapReduceBase implements
  Reducer {
  private IntWritable result = new IntWritable();
  

  @Override
  public void reduce(Text key, Iterator values,
  OutputCollector output, Reporter reporter)
  throws IOException {
  // TODO Auto-generated method stub
  int sum = 0;
  while (values.hasNext()) {
  sum += values.next().get();
  }
  result.set(sum);
  output.collect(key, result);
  }
  }
  

  public static void main(String[] args) throws Exception {
  String input = null;
  String output = null;
  JobConf conf = new JobConf(WordCount.class);
  conf.setJobName("WordCount");
  // 测试环境
  if (args == null || args.length < 2) {
  input = "hdfs://192.168.1.220:9000/usr/input/wordcount/*";//程序的输入文件路径
  output = "hdfs://192.168.1.220:9000/usr/output/wordcount";//程序的输出文件路径
  

  conf.addResource("classpath:/hadoop/core-site.xml");//配置文件需要从hadoop中导出,保存到myeclipse的目录下
  conf.addResource("classpath:/hadoop/hdfs-site.xml");
  conf.addResource("classpath:/hadoop/mapred-site.xml");
  } else {
  // 正式环境
  input = args;
  output = args;
  }
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);
  conf.setMapperClass(WordCountMapper.class);
  conf.setCombinerClass(WordCountReducer.class);
  conf.setReducerClass(WordCountReducer.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);
  FileInputFormat.setInputPaths(conf, new Path(input));
  FileOutputFormat.setOutputPath(conf, new Path(output));
  JobClient.runJob(conf);
  System.exit(0);
  }
  
}


页: [1]
查看完整版本: Hadoop中WordCount代码-直接加载hadoop的配置文件