jane27 发表于 2017-12-17 12:40:51

Hadoop Demo 倒排索引

  package com.asin.hdp.inverted;
  import java.io.IOException;   
  import java.util.StringTokenizer;
  import org.apache.hadoop.conf.Configuration;   
  import org.apache.hadoop.fs.Path;   
  import org.apache.hadoop.io.IntWritable;   
  import org.apache.hadoop.io.LongWritable;   
  import org.apache.hadoop.io.Text;   
  import org.apache.hadoop.mapreduce.InputSplit;   
  import org.apache.hadoop.mapreduce.Job;   
  import org.apache.hadoop.mapreduce.Mapper;   
  import org.apache.hadoop.mapreduce.Reducer;   
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  public>  public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();   
  Job job = Job.getInstance(conf);   
  job.setJarByClass(InvertedIndexCombine.class);
  job.setMapperClass(invertedMapper.class);   
  job.setCombinerClass(invertedCombine.class);   
  job.setReducerClass(invertedReduce.class);
  job.setOutputKeyClass(Text.class);   
  job.setOutputValueClass(Text.class);
  FileInputFormat.addInputPath(job, new Path("e:/a.txt"));   
  FileInputFormat.addInputPath(job, new Path("e:/b.txt"));   
  FileInputFormat.addInputPath(job, new Path("e:/c.txt"));   
  FileOutputFormat.setOutputPath(job, new Path("e:/outputCombine"));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

  public static>  @Override   
  protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)   
  throws IOException, InterruptedException {
  FileSplit split = (FileSplit) context.getInputSplit();   
  Path path = split.getPath();   
  String name = path.getName().replace("e:/", "");
  StringTokenizer token = new StringTokenizer(value.toString(), " ");   
  while (token.hasMoreTokens()) {
  context.write(new Text(name + "\t" + token.nextToken()), new Text("1"));   
  }
  }   
  }

  public static>  @Override   
  protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)   
  throws IOException, InterruptedException {
  String line = key.toString();   
  String[] split = line.split("\t");
  int sum = 0;   
  for (Text text : values) {   
  sum += Integer.parseInt(text.toString());   
  }   
  context.write(new Text(split), new Text(split + ":" + sum));
  }   
  }

  public static>  @Override   
  protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)   
  throws IOException, InterruptedException {
  String val = "";   
  for (Text text : values) {   
  val += text + "\t";   
  }
  context.write(new Text(key), new Text(val));   
  }   
  }
  }
页: [1]
查看完整版本: Hadoop Demo 倒排索引