夜勿眠 发表于 2016-12-13 09:25:31

大数据框架hadoop的作业初始化过程(接上编)

    本文接上一编文章《大数据框架hadoop的作业提交过程》。调度器调用JobTracker.initJob()函数对新作业进行初始化。相关代码如下:
// 调度器调用eagerTaskInitializationListener.start()方法。
class JobQueueTaskScheduler extends TaskScheduler { 
 @Override
  public synchronized void start() throws IOException {
super.start();
... ...
eagerTaskInitializationListener.start();
... ...
}
}
// EagerTaskInitializationListener.start()方法启动作业管理器线程。
class EagerTaskInitializationListener extends JobInProgressListener {
  ... ...
  public void start() throws IOException {
    this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
    ... ...
    this.jobInitManagerThread.start();
  }
  ... ...
}
// 作业初始化管理器执行作业初始化动作。
class JobInitManager implements Runnable {
    public void run() {
      ... ...
      threadPool.execute(new InitJob(job));
      ... ...
    }
}
作业初始化的主要工作是构造Map Task和Reduce Task并对它们进行初始化。
Hadoop将每个作业分解成4种类型的任务,分别是Setup Task、Map Task、Reduce Task和Cleanup Task。它们的运行时信息由TaskInProgress类维护,因此,创建这些任务实际上是创建TaskInProgress对象。
上述4种任务的作用及创建过程如下。
n Setup Task:作业初始化标识性任务。它进行一些非常简单的作业初始化工作,比如将运行状态设置为“setup”,调用OutputCommitter.setupJob()函数等。该任务运行完后,作业由PREP状态变为RUNNING状态,并开始运行Map Task。该类型任务又被分为Map Setup Task和Reduce Setup Task两种,且每个作业各有一个。它们运行时分别占用一个Map slot和Reduce slot。由于这两种任务功能相同,因此有且只有一个可以获得运行的机会(即只要有一个开始运行,另一个马上被杀掉,而具体哪一个能够运行,取决于当时存在的空闲slot种类及调度策略。相关代码如下:
public class JobInProgress {
  TaskInProgress setup[] = new TaskInProgress;
  ... ...
  public synchronized void initTasks() {
    ... ...
    // create two setup tips, one map and one reduce.
    setup = new TaskInProgress;
    // setup map tip. This map doesn't use any split. Just assign an empty
    // split.
    setup = new TaskInProgress(jobId, jobFile, emptySplit, 
            jobtracker, conf, this, numMapTasks + 1, 1);
    setup.setJobSetupTask();
    // setup reduce tip.
    setup = new TaskInProgress(jobId, jobFile, numMapTasks,
                       numReduceTasks + 1, jobtracker, conf, this, 1);
setup.setJobSetupTask();
... ...
  }
}
 
n Map Task:Map阶段处理数据的任务。其数目及对应的处理数据分片由应用程序中的
InputFormat组件确定。关代码如下:
public class JobInProgress {
  TaskInProgress maps[] = new TaskInProgress;
  ... ...
  public synchronized void initTasks() {
    // read input splits and create a map per a split
TaskSplitMetaInfo[] splits = createSplits(jobId);
numMapTasks = splits.length;
    ... ...
    maps = new TaskInProgress[numMapTasks];
    for(int i=0; i < numMapTasks; ++i) {
      inputLength += splits[i].getInputDataLength();
      maps[i] = new TaskInProgress(jobId, jobFile, splits[i], 
                                   jobtracker, conf, this, i, numSlotsPerMap);
}
... ...
  }
}
n Reduce Task:Reduce阶段处理数据的任务。其数目由用户通过参数mapred.reduce.tasks(默认数目为1)指定。考虑到Reduce Task能否运行依赖于Map Task的输出结果,因此,Hadoop刚开始只会调度Map Task,直到Map Task完成数目达到一定比例(由参数mapred.reduce.slowstart.completed.maps指定,默认是0.05,即5%)后,才开始调度Reduce Task。关代码如下:
public class JobInProgress {
  TaskInProgress reduces[] = new TaskInProgress;
  ... ...
  public synchronized void initTasks() {
    ... ...
    // Create reduce tasks
    this.reduces = new TaskInProgress[numReduceTasks];
    for (int i = 0; i < numReduceTasks; i++) {
      reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, 
        jobtracker, conf, this, numSlotsPerReduce);
      nonRunningReduces.add(reduces[i]);
}
... ...
}
n Cleanup Task:作业结束标志性任务,主要完成一些清理工作,比如删除作业运行过程中用到的一些临时目录(比如_temporary目录)。一旦该任务运行成功后,作业由RUNNING状态变为SUCCESSED状态。关代码如下:
public class JobInProgress {
  TaskInProgress cleanup[] = new TaskInProgress;
  ... ...
  public synchronized void initTasks() {
    ... ...
    // create cleanup two cleanup tips, one map and one reduce.
    cleanup = new TaskInProgress;
    // cleanup map tip. This map doesn't use any splits. Just assign an empty
    // split.
    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
    cleanup = new TaskInProgress(jobId, jobFile, emptySplit, 
            jobtracker, conf, this, numMapTasks, 1);
    cleanup.setJobCleanupTask();
    // cleanup reduce tip.
    cleanup = new TaskInProgress(jobId, jobFile, numMapTasks,
                       numReduceTasks, jobtracker, conf, this, 1);
cleanup.setJobCleanupTask();
... ...
}
 
页: [1]
查看完整版本: 大数据框架hadoop的作业初始化过程(接上编)