设为首页 收藏本站
查看: 783|回复: 0

[经验分享] Hadoop MapReduce任务的启动分析

[复制链接]

尚未签到

发表于 2016-12-9 08:51:23 | 显示全部楼层 |阅读模式
 

正常情况下,我们都是启动Hadoop任务的方式大概就是通过hadoop jar命令(或者写在shell中),事实上运行的hadoop就是一个包装的.sh,下面就是其中的最后一行,表示在其中执行一个java命令,调用hadoop的一些主类,同时配置一些hadoop的相关CLASSPATH,OPTS等选项:

 

exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
 

 

当使用hadoop jar时,调用的$CLASS是下面的类型:

 

org.apache.hadoop.util.RunJar
 

 

而通过hadoop jar调用的主类,必须满足条件:

 

1,其中有main方法,类似下面的定义:

public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new ThisClass(), args);
System.exit(result);
}
 

 

2. ToolRunner中的的类需要有如下签名:

 

extends Configured implements Tool


 

并实现其中的public int run方法,在进行必要的hadoop job构造后,执行job的方法,同步等待执行结果并返回即可。

 

boolean success = job2.waitForCompletion(true);

 

 

大体的过程如下,以前也没有对整个过程进行质疑,直到我们有新的需要,在其他的客户端(java,而不是shell中)启动MapReduce任务,顺带好好看了这个函数waitForCompletion...

public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}

 

 

读完源码后发现,其实这个方法主要的目的就是看一下当前job的状态,如果没有提交,那么就执行submit操作(同步)将其提交到集群上。传递的参数verbose,如果是true,就是表示需要检测并打印job的相关信息(使用LOG.info()来打印到console中);否则,就等待任务的complete,反正这是个同步的操作;我们如果不需要监测任务的执行状态,仅仅进行一步submit就可以了。

 

那么就看一下monitorAndPrintJob这个函数吧,核心代码如下:

 

while (!isComplete() || !reportedAfterCompletion) {
if (isComplete()) {
reportedAfterCompletion = true;
} else {
Thread.sleep(progMonitorPollIntervalMillis);
}
if (status.getState() == JobStatus.State.PREP) {
continue;
}      
if (!reportedUberMode) {
reportedUberMode = true;
LOG.info("Job " + jobId + " running in uber mode : " + isUber());
}      
String report =
(" map " + StringUtils.formatPercent(mapProgress(), 0)+
" reduce " +
StringUtils.formatPercent(reduceProgress(), 0));
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
}
TaskCompletionEvent[] events =
getTaskCompletionEvents(eventCounter, 10);
eventCounter += events.length;
printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
}
boolean success = isSuccessful();
if (success) {
LOG.info("Job " + jobId + " completed successfully");
} else {
LOG.info("Job " + jobId + " failed with state " + status.getState() +
" due to: " + status.getFailureInfo());
}
Counters counters = getCounters();
if (counters != null) {
LOG.info(counters.toString());
}
return success;
 

其实就是定时循环去报告,检查状态,其中涉及到map和reduce的总体进度(通过某种算法计算出来的百分比),如果报告与上一次有变化,就进行输出。直到任务执行完成,并将其中的所有Counter均打印出来;如果任务失败,打印出任务执行失败的原因。

 

最终,MapReduce的执行日志大概就是这个样子:

 

15/04/13 15:01:08 INFO mapreduce.Job:  map 96% reduce 28%
15/04/13 15:01:09 INFO mapreduce.Job:  map 98% reduce 28%
15/04/13 15:01:10 INFO mapreduce.Job:  map 98% reduce 32%
15/04/13 15:01:13 INFO mapreduce.Job:  map 100% reduce 33%
15/04/13 15:01:16 INFO mapreduce.Job:  map 100% reduce 37%
15/04/13 15:01:19 INFO mapreduce.Job:  map 100% reduce 46%
15/04/13 15:01:22 INFO mapreduce.Job:  map 100% reduce 54%
15/04/13 15:01:25 INFO mapreduce.Job:  map 100% reduce 62%
15/04/13 15:01:28 INFO mapreduce.Job:  map 100% reduce 68%
15/04/13 15:01:31 INFO mapreduce.Job:  map 100% reduce 71%
15/04/13 15:01:34 INFO mapreduce.Job:  map 100% reduce 76%
15/04/13 15:01:35 INFO mapreduce.Job:  map 100% reduce 100%
15/04/13 15:01:37 INFO mapreduce.Job: Job job_1421455790417_222365 completed successfully
15/04/13 15:01:37 INFO mapreduce.Job: Counters: 46
File System Counters
FILE: Number of bytes read=70894655
FILE: Number of bytes written=158829484
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=5151416348
HDFS: Number of bytes written=78309
HDFS: Number of read operations=1091
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
 

 

 

如果我们需要将任务执行进度打印出来,就可以对这部分的功能就行改进并重写。

 

如果任务已经提交到集群,可以使用job对象的getTrackingURL()通过页面的形式查看到其具体详情,其中job对象还提供了一些可以操作集群任务的API,包括killTask, failTask等。

 

在任务执行完成后,就可以得到任务的所有Counter,使用Counter来对任务的各项指标进行详细统计是非常易用有效的方式,我们在任务中定义了大量的Counter来进行该操作(包括以后以后可能会评估任务的消耗,以便进行费用统计等…)。

 

如果需要启动多个任务,或以某种依赖的方式启动多个顺序MapReduce任务,可以使用JobControl来链接多个任务,JobControl的run方法,会根据任务的依赖关系来调度整个过程,并提供了一些常用的API,同样可以将任务kill/fail掉。但是如果流程的复杂性稍微比较高的情况下,建议使用一套工作流系统,例如oozie,便于管理以及应对流程上的变化。 

 

 

 

 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311700-1-1.html 上篇帖子: 【Hadoop八】Yarn的资源调度策略 下篇帖子: Hadoop 统计专利被那些专利所引用(一)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表