设为首页 收藏本站
查看: 644|回复: 0

[经验分享] Hadoop Map/Reduce框架

[复制链接]
发表于 2016-12-7 06:51:39 | 显示全部楼层 |阅读模式
  Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。WordCount.java

package org.myorg;//包含在包myorg中
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {//定义映射map类
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//定义对象word用来包含需要统计的字符串
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
//定义具体的映射方法,传入参数key,value(要统计的字符串),输出参数output(每个单词出现的个数统计映射方式),reporter
String line = value.toString();//一行一行的读入传入的字符串
StringTokenizer tokenizer = new StringTokenizer(line);//以空格分隔符将一行分为若干tokens
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);//统计相同单词出现的次数
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
//定义reduce类,将每个key(本例中就是单词)出现的次数求和
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
//定义具体的reduce方法,将每次映射中对应每个单词出现的次数进行统计求和,输入参数有key,values,输出参数有output ,reporter
int sum = 0;//初始化总数
while (values.hasNext()) {//将两次映射中对应的次数进行求和
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));//输出output得到最终结果
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);//定义JobCon的配置变量conf,代表一个Map/Reduce作业的配置。
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);//框架随后会把与一个特定key关联的所有中间过程的值(value)分成组,然后把它们传给Reducer以产出最终的结果。用户可以通过 JobConf.setOutputKeyComparatorClass(Class)来指定具体负责分组的 Comparator。
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);//Mapper已经排好序的输出
conf.setCombinerClass(Reduce.class);//用户可选择通过 JobConf.setCombinerClass(Class)指定一个combiner,它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper到 Reducer数据传输量。
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));//定义文件的输入格式,指定一组输入文件
FileOutputFormat.setOutputPath(conf, new Path(args[1]));//定义文件的输出格式,输出文件应该写在哪儿
JobClient.runJob(conf);//提交作业并且监控它的执行
}
}
package org.myorg;
import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount extends Configured implements Tool  {//定义单词统计WordCount类
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,
Text,IntWritable>{//定义映射map类
static enum Counters{ INPUT_WORDS }

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//定义对象word用来包含需要统计的字符串
//初始化每个统计变量
private boolean caseSensitive = true;
private Set<string> patternsToSkip = new HashSet<String>();

private long numRecords = 0;
private String inputFile;

public void configure(JobConf job) {//修改配置参数
caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
inputFile = job.get("map.input.file");
if (job.getBoolean("wordcount.skip.patterns", false)){
path[] patternsFiles = new Path[0];
try{
patternsFiles = DistributedCache.getLocalCacheFiles(job);//使用DistributedCache 来分发只读数据。 这里允许用户指定单词的模式,在计数时忽略那些符合模式的单词
} catch (IOException ioe){
System.err.println("Caught   exception   while  getting  cached  files:  "  +
StringUtils.stringifyException(ioe));
}
for (Path patternsFile : patternsFiles){
parseSkipFile(patternsFile);
}
}
}  
private void parseSkipFile(Path patternsFile) {//从文件中读入字符串
try {
BufferedReader fis = new BufferedRdader(new FileReader(patternsFile.toString()));//读入数据
String pattern = null;
while ((pattern = fis.readLine()) != null){
patternsToSkip.add(pattern);//将读入数据保存到patternsToSkip
}
}catch (IOException ioe){
System.err.println("Caught  exception  while  parsing  the  cached  file  '"  +
patternsFile + "' :" + StringUtils.stringifyException(ioe));
}
}

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable>
output, Reporter reporter) throws IOException {
String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();//一行一行的读入传入的字符串

for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}  

StringTokenizer tokenizer = new StringTokenizer(line);//以空格分隔符将一行分为若干tokens
while (tokenizer.hasMoreTokens()) {//统计相同单词出现的次数
word.set(tokenizer.nextToken());
output.collect(word, one);//统计相同单词出现的次数
reporter.incrCounter(Counters.INPUT_WORDS, 1);//遇到相同的进行+1操作
}

if((++numRecords % 100) == 0) {//打印最后的统计结果
reporter.setStatus("Finished processing "+ numRecords + " records" + "from the input file: "+ inputFile);
}
}
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable,
Text, IntWritable> {//定义映射之间总数的统计
public void   reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {////定义具体的reduce方法,将每次映射中对应每个单词出现的次数进行统计求和,输入参数有key,values,输出参数有output
int sum = 0;//初始化总数  
while (values.hasNext()) {//将两次映射中对应的次数进行求和  
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));//输出output得到最终结果  
}
}

public int run(String[] args) throws Exception {//执行统计方法run
JobConf conf = new JobConf(getConf(), WordCount.class);//定义JobCon的配置变量conf,代表一个Map/Reduce作业的配置。
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);//框架随后会把与一个特定key关联的所有中间过程的值(value)分成组,然后把它们传给Reducer以产出最终的结果。用户可以通过 JobConf.setOutputKeyComparatorClass(Class)来指定具体负责分组的 Comparator。
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);;//Mapper已经排好序的输出
conf.setCombinerClass(Reduce.class);//用户可选择通过 JobConf.setCombinerClass(Class)指定一个combiner,它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper到 Reducer数据传输量。
conf.setReducerClass(Reduce.class);

conf.setIntputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutpotFormat.class);

List<String> other_args = new ArrayList<String>();
for (int i=0; i < args.length; ++i) {
if ("-skip".equals(args)) {
DistributedCache.addCacheFile(new path(args[++i]).toUri(), conf);//使用DistributedCache 来分发只读数据。 这里允许用户指定单词的模式,在计数时忽略那些符合模式的单词
conf.setBoolean("wordcount.skip.patterns", true);
} else{
other_args.add(args);
}
}

FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));//定义文件的输入格式,指定一组输入文件
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));//定义文件的输出格式,输出文件应该写在哪儿

JobClient.runJob(conf);//提交作业并且监控它的执行  
return 0;
}

public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(), new WordCount(), args);//执行wordcount统计
System.exit(res);
}
}
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-310578-1-1.html 上篇帖子: hadoop目前遇到的问题 下篇帖子: hadoop源码-包结构
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表