xsmscb 发表于 2016-12-11 11:34:24

读书笔记3:Hadoop实战-(1)构建MapReduce模版

public class MapReduceDemo extends Configured implements Tool {
    public static class MapClass extends MapReduceBase implements Mapper<Text,Text,Text,Text>
{
public void map(Text key,Text value,OutputCollector<Text,Text> output,Reporter reporter)throws                        IOException
{
output.collect(value,key);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text,Text,Text,Text>
{
public void reduce(Text key,Iterator<Text> values,OutputCollector<Text,Text> output,,Reporter reporter)throws IOException
{
String csv = "";
while(values.hasNext())
{
if(csv.length()>0)
csv+=",";
csv+=values.next().toString();
}
output.collect(key,new Text(csv));
}
}
/**
 * 结构的核心  被称为driver
 * @param args
 * @return
 * @throws Exception
 */
public int run(String[] args)throws Exception
{
Configuration conf = getConf();
JobConf  job = new JobConf(conf,MapReduceDemo.class);

Path in = new Path(args);
Path out = new Path(args);
FileInputFormat.setInputPaths(job,in);
FileOutputFormat.setOutputPath(job,out);

job.setJobName("MapReduceDemo");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line",",");

JobClient.runJob(job);
return 0;
}
public static void main(String[]args)throws Exception
{
int res = ToolRunner.run(new Configuration(),new MapReduceDemo(),args)
System.exit(res);
}
}

执行程序:
bin/hadoop jar playground/MapReduceDemo.jar MapReduceDemo input /cite75_99.txt output
如果只想看mapper的输出,可以将reducer的 数目设置成0,如下命令:
bin/hadoop jar playground/MapReduceDemo.jar MapReduceDemo -D mapred.reduce.tasks=0  input /cite75_99.txt output
页: [1]
查看完整版本: 读书笔记3:Hadoop实战-(1)构建MapReduce模版