设为首页 收藏本站
查看: 312|回复: 0

[经验分享] Hadoop Library MapReduce Classes

[复制链接]

尚未签到

发表于 2016-12-6 06:55:35 | 显示全部楼层 |阅读模式
Hadoop comes with a set of Mappers and Reducers for commonly used functions, this blog post will explain the usage of these built-in Mappers and Reducers in Hadoop v1.1.1.
 


  • ChainMapper, ChainReducer   This pair runs a chain of mappers in a single mapper, and runs a reducer followed by a chain of mappers in a single reducer. This can substantially reduce the amount of disk I/O incurred compared to running multiple MapReduce Jobs. Since ChainMapper and ChainReducer new API is missing from Hadoop 1.1.1, and there's no proper patch for the current stable version, I add a example here in old API without compiling and running it on my local cluster.
    /**
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.mapred.lib.ChainMapper;
    import org.apache.hadoop.mapred.lib.ChainReducer;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    /**
    * Sample program for ChainMapper/ChainReducer.
    */
    public class ChainWordCount extends Configured implements Tool {
    public static class Tokenizer extends MapReduceBase
    implements Mapper {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value,
    OutputCollector output,
    Reporter reporter) throws IOException {
    String line = value.toString();
    System.out.println("Line:"+line);
    StringTokenizer itr = new StringTokenizer(line);
    while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    output.collect(word, one);
    }
    }
    }
    public static class UpperCaser extends MapReduceBase
    implements Mapper {
    public void map(Text key, IntWritable value,
    OutputCollector output,
    Reporter reporter) throws IOException {
    String word = key.toString().toUpperCase();
    System.out.println("Upper Case:"+word);
    output.collect(new Text(word), value);
    }
    }
    public static class Reduce extends MapReduceBase
    implements Reducer {
    public void reduce(Text key, Iterator values,
    OutputCollector output,
    Reporter reporter) throws IOException {
    int sum = 0;
    while (values.hasNext()) {
    sum += values.next().get();
    }
    System.out.println("Word:"+key.toString()+"\tCount:"+sum);
    output.collect(key, new IntWritable(sum));
    }
    }
    static int printUsage() {
    System.out.println("wordcount  ");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
    }
    public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), ChainWordCount.class);
    conf.setJobName("wordcount");
    if (args.length != 2) {
    System.out.println("ERROR: Wrong number of parameters: " +
    args.length + " instead of 2.");
    return printUsage();
    }
    FileInputFormat.setInputPaths(conf, args[0]);
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    JobConf mapAConf = new JobConf(false);
    ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);
    JobConf mapBConf = new JobConf(false);
    ChainMapper.addMapper(conf, UpperCaser.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);
    JobConf reduceConf = new JobConf(false);
    ChainReducer.setReducer(conf, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
    JobClient.runJob(conf);
    return 0;
    }
    public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainWordCount(), args);
    System.exit(res);
    }
    }
     
  • FieldSelectionMapper and FieldSelectionReducer. This pair can select fields (like the Unix cut command) from the input keys and values and emit them as output keys and values.
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
    import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionMapper;
    import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionReducer;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    /**
    * Demonstrate how to use FieldSelectionMapper and FieldSelectionReducer.
    * It implements a job that can be used to perform field selections in a
    * manner similar to unix cut.
    * <p/>
    * User: George Sun
    * Date: 7/4/13
    * Time: 9:44 PM
    */
    public class FieldSelectionMRExample extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
    if (args.length != 2) {
    JobBuilder.printUsage(this, "<input> <output>");
    return -1;
    }
    Configuration conf = getConf();
    conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "-");
    conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "6,5,1-3:0-");
    conf.set(FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, ":4,3,2,1,0,0-");
    Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
    job.setMapperClass(FieldSelectionMapper.class);
    job.setReducerClass(FieldSelectionReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    // 1 reducer is ok for this job.
    job.setNumReduceTasks(1);
    return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new FieldSelectionMRExample(), args);
    System.exit(exitCode);
    }
    }

     The JobBuilder class:
    // == JobBuilder
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    public class JobBuilder {
    private final Class<?> driverClass;
    private final Job job;
    private final int extraArgCount;
    private final String extrArgsUsage;
    private String[] extraArgs;
    public JobBuilder(Class<?> driverClass) throws IOException {
    this(driverClass, 0, "");
    }
    public JobBuilder(Class<?> driverClass, int extraArgCount, String extrArgsUsage) throws IOException {
    this.driverClass = driverClass;
    this.extraArgCount = extraArgCount;
    this.job = new Job();
    this.job.setJarByClass(driverClass);
    this.extrArgsUsage = extrArgsUsage;
    }
    // vv JobBuilder
    public static Job parseInputAndOutput(Tool tool, Configuration conf,
    String[] args) throws IOException {
    if (args.length != 2) {
    printUsage(tool, "<input> <output>");
    return null;
    }
    Job job = new Job(conf);
    job.setJarByClass(tool.getClass());
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job;
    }
    public static void printUsage(Tool tool, String extraArgsUsage) {
    System.err.printf("Usage: %s [genericOptions] %s\n\n",
    tool.getClass().getSimpleName(), extraArgsUsage);
    GenericOptionsParser.printGenericCommandUsage(System.err);
    }
    // ^^ JobBuilder
    public JobBuilder withCommandLineArgs(String... args) throws IOException {
    Configuration conf = job.getConfiguration();
    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
    String[] otherArgs = parser.getRemainingArgs();
    if (otherArgs.length < 2 && otherArgs.length > 3 + extraArgCount) {
    System.err.printf("Usage: %s [genericOptions] [-overwrite] <input path> <output path> %s\n\n",
    driverClass.getSimpleName(), extrArgsUsage);
    GenericOptionsParser.printGenericCommandUsage(System.err);
    System.exit(-1);
    }
    int index = 0;
    boolean overwrite = false;
    if (otherArgs[index].equals("-overwrite")) {
    overwrite = true;
    index++;
    }
    Path input = new Path(otherArgs[index++]);
    Path output = new Path(otherArgs[index++]);
    if (index < otherArgs.length) {
    extraArgs = new String[otherArgs.length - index];
    System.arraycopy(otherArgs, index, extraArgs, 0, otherArgs.length - index);
    }
    if (overwrite) {
    output.getFileSystem(conf).delete(output, true);
    }
    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, output);
    return this;
    }
    public Job build() {
    return job;
    }
    public String[] getExtraArgs() {
    return extraArgs;
    }
    }

     
  • IntSumReducer, LongSumReducer      Reducers that sum integer values to produce a total for every key. It's really easy to understand how them work.
    public static class IntSumReducer
    extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values,
    Context context
    ) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
    sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
    }
    }
     
  • InverseMapper    A mapper that swaps keys and values. Really easy, so no example here.
    /**
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package org.apache.hadoop.mapreduce.lib.map;
    import java.io.IOException;
    import org.apache.hadoop.mapreduce.Mapper;
    /** A {@link Mapper} that swaps keys and values. */
    public class InverseMapper<K, V> extends Mapper<K,V,V,K> {
    /** The inverse function.  Input keys and values are swapped.*/
    @Override
    public void map(K key, V value, Context context
    ) throws IOException, InterruptedException {
    context.write(value, key);
    }
    }
     
  • MultithreadedMapper     Mapper that runs mappers concurrently in separate threads. MultithreadedMapper is useful for mappers that are not CPU-bound. When you are I/O bound like fetch pages from web which has more latency than from local I/O. In such case, using MultithreadedMapper would help as you are not blocked on a single network I/O call and you can continue processing as data is made available to you. But if you have large data in HDFS to be processed then they are readily fetched as the data is localized and if the computation is CPU bound then multi-core, multi-process solution is more helpful. Also you will have to ensure that your mappers are thread safe. Code snippet of an example as below:
    Configuration conf = new Configuration();
    Job job = new Job(conf);
    job.setMapperClass(MultithreadedMapper.class);
    MultithreadedMapper.setMapperClass(job, WebGraphMapper.class);
    MultithreadedMapper.setNumberOfThreads(job, 8);
    //conf.set("mapred.map.multithreadedrunner.class", //WebGraphMapper.class.getCanonicalName());
    //conf.set("mapred.map.multithreadedrunner.threads", "8");
    job.setJarByClass(WebGraphMapper.class);
    // rest ommitted
    job.waitForCompletion(true);
     

  • TokenCounterMapper    A mapper that tokenizes the input value into words (using Java's StringTokenizer) and emits each word along with a count of one. It's simple, so no examples.
    /**
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package org.apache.hadoop.mapreduce.lib.map;
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    /**
    * Tokenize the input values and emit each word with a count of 1.
    */
    public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    @Override
    public void map(Object key, Text value, Context context
    ) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
    }
    }
    }
     

  • RegexMapper    A mapper that finds matches of a regular expression in the input value and emits the matches along with a count of one. The new API of this class is also missing from Hadoop 1.1.1. But the old API here is enough to illustrate the concept of it.
    /**
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package org.apache.hadoop.mapred.lib;
    import java.io.IOException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;

    /** A {@link Mapper} that extracts text matching a regular expression. */
    public class RegexMapper<K> extends MapReduceBase
    implements Mapper<K, Text, Text, LongWritable> {
    private Pattern pattern;
    private int group;
    public void configure(JobConf job) {
    pattern = Pattern.compile(job.get("mapred.mapper.regex"));
    group = job.getInt("mapred.mapper.regex.group", 0);
    }
    public void map(K key, Text value,
    OutputCollector<Text, LongWritable> output,
    Reporter reporter)
    throws IOException {
    String text = value.toString();
    Matcher matcher = pattern.matcher(text);
    while (matcher.find()) {
    output.collect(new Text(matcher.group(group)), new LongWritable(1));
    }
    }
    }
     

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-310113-1-1.html 上篇帖子: Startup Standalone Hadoop Mode 下篇帖子: Hadoop实战-中高级部分 之 Hadoop 管理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表