public class DistinctProcessor extends MapReduceBase implements Mapper<Text,Text,Text,Text> {
MapReduceBase里面声明了如下方法:
public class MapReduceBase
implements Closeable, JobConfigurable
{
public void close()
throws IOException
{
}
public void configure(JobConf job)
{
}
}
MapReduce框架在创建Mapper对象后,会调用configure方法,实现类可以从job获得到想要的信息,供map函数使用。不过MapReduce在JobConf也放置了关于MapTask的信息,供实现类使用:
mapred.job.idStringThe job id
mapred.jarStringjob.jar location in job directory
job.local.dir String The job specific shared scratch space
mapred.tip.id String The task id
mapred.task.id String The task attempt id
mapred.task.is.map booleanIs this a map task
mapred.task.partition intThe id of the task within the job
map.input.file String The filename that the map is reading from
map.input.start long The offset of the start of the map input split
map.input.lengthlongThe number of bytes in the map input split
mapred.work.output.dir StringThe task's temporary output directory
map.input.file表示读入的文件,这个比较有用,尤其是在根据不同文件名称进行不同处理的时候,可以据此来进行处理。
mapred.tip.id和mapred.job.id表示task和Job id。map.input.length表示此次map处理的文件长度。
总结: 借助Map和Reducer的上下文信息,我们可以在map和reduce函数中增加额外的判断和处理。
JobConf conf = new JobConf();
conf.setInputPath(inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setMapperClass(MOMap.class);
conf.setReducerClass(MOReduce.class);
...
// Defines additional single text based output 'text' for the job
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// Defines additional multi sequencefile based output 'sequence' for the
// job
MultipleOutputs.addMultiNamedOutput(conf, "seq",
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
...
JobClient jc = new JobClient();
RunningJob job = jc.submitJob(conf);
...
Job configuration usage pattern is:
public class MOReduce implements
Reducer<WritableComparable, Writable> {
private MultipleOutputs mos;
public void configure(JobConf conf) {
...
mos = new MultipleOutputs(conf);
}
public void reduce(WritableComparable key, Iterator<Writable> values,
OutputCollector output, Reporter reporter)
throws IOException {
...
mos.getCollector("text", reporter).collect(key, new Text("Hello"));
mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
...
}
public void close() throws IOException {
mos.close(); //此处一定要加上!!!!!!!
...
}
}