dew 发表于 2016-12-6 10:34:16

Hadoop Inside (3)

  之前的MapReduce Demo只能在一台机器上运行,现在是时候让它分布式运行了。在对MapReduce的运行流程和FileSystem进行了简单研究之后,现在尝试从配置着手,看看怎样让Hadoop在两台机器上面同时运行MapReduce。
  首先看回这里
  String tracker = conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = (JobSubmissionProtocol)
RPC.getProxy(JobSubmissionProtocol.class,
JobTracker.getAddress(conf), conf);
}
  当tracker地址不为local,则tracker为Remote Client的 JobTracker 类,这里重点分析。
  JobTracker有一个main函数,注释显示它仅仅用于调试,正常情况是作为DFS Namenode进程的一部分来运行。不过这里我们可以先从它着手开始分析。
  tracker = new JobTracker(conf); //构造
  构造函数先获取一堆常量的值,然后清空'systemDir',接着启动RPC服务器。
  InetSocketAddress addr = getAddress(conf);
this.localMachine = addr.getHostName();
this.port = addr.getPort();
this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
this.interTrackerServer.start();
  启动TrackInfoServer:
  this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
this.infoServer = new JobTrackerInfoServer(this, infoPort);
this.infoServer.start();
  TrackInfoServer 提供了通过HTTP方式获取JobTracker信息的方式,可以方便用于监测工作任务的进度。
  启动三个守护线程:
  new Thread(this.expireTrackers).start(); //Used to expire TaskTrackers that have gone down
new Thread(this.retireJobs).start(); //Used to remove old finished Jobs that have been around for too long
new Thread(this.initJobs).start(); //Used to init new jobs that have just been created
  三个线程的用处已经注释,这里不作分析。下面开始分析 JobTracker.submitJob()
  之前已经分析过 LocalJobRunner.submitJob(),它实例化内部类Job,在里面实现MapReduce流程。JobTracker就复杂一些,它实例化 JobInProgress,然后将这个Job提交到队列:
  JobInProgress job = new JobInProgress(jobFile, this, this.conf);
synchronized (jobs) {
synchronized (jobsByArrival) {
synchronized (jobInitQueue) {
jobs.put(job.getProfile().getJobId(), job);
jobsByArrival.add(job);
jobInitQueue.add(job);
jobInitQueue.notifyAll();
}
}
}
  此时RetireJobs线程开始处理超时和出错的Job,JobInitThread线程初始化工作任务: job.initTasks();
  开始分析 JobInProgress
  在构造函数中,Tracker从发起端的DFS获取任务文件(xml和jar),然后保存到本地目录下面
  JobConf default_job_conf = new JobConf(default_conf);
this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,
jobid + ".xml");
this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,
jobid + ".jar");
FileSystem fs = FileSystem.get(default_conf);
fs.copyToLocalFile(new File(jobFile), localJobFile);

conf = new JobConf(localJobFile);
this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url,
conf.getJobName());
String jarFile = conf.getJar();
if (jarFile != null) {
fs.copyToLocalFile(new File(jarFile), localJarFile);
conf.setJar(localJarFile.getCanonicalPath());
}

  这里要注意jarFile,JobConf的构造函数:
  public JobConf(Configuration conf, Class aClass) {
this(conf);
String jar = findContainingJar(aClass);
if (jar != null) {
setJar(jar);
}
}
  如果 aClass 是在一个jar里面,那么setJar(jar);就会被执行,这个jar会被copy到 LocalJobRunner 或是 JobTracker 的工作目录下面。所以这里有一个原则: 将要执行的MapReduce操作的所有class打包到一个jar中,这样才能执行分布式的MapReduce计算。
  再看 JobInProgress.initTasks()
  先从Jar中加载InputFormat
  String ifClassName = jd.get("mapred.input.format.class");
InputFormat inputFormat;
if (ifClassName != null && localJarFile != null) {
try {
ClassLoader loader =
new URLClassLoader(new URL[]{ localJarFile.toURL() });
Class inputFormatClass = loader.loadClass(ifClassName);
inputFormat = (InputFormat)inputFormatClass.newInstance();
} catch (Exception e) {
throw new IOException(e.toString());
}
} else {
inputFormat = jd.getInputFormat();
}
  接下来对文件块的大小进行排序
  创建对应的Map任务
  this.numMapTasks = splits.length;
// create a map task for each split
this.maps = new TaskInProgress;
for (int i = 0; i < numMapTasks; i++) {
maps = new TaskInProgress(jobFile, splits, jobtracker, conf, this);
}
  创建Reduce任务
  this.reduces = new TaskInProgress;
for (int i = 0; i < numReduceTasks; i++) {
reduces = new TaskInProgress(jobFile, maps, i, jobtracker, conf, this);
}
  最后对于每Split的信息进行缓存,并且创建状态类
  for (int i = 0; i < maps.length; i++) {
String hints[][] = fs.getFileCacheHints(splits.getFile(), splits.getStart(), splits.getLength());
cachedHints.put(maps.getTIPId(), hints);
}

this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
  现在轮到 TaskInProgress,它将Job里面的Map和Reduce操作进行了封装,但是JobInProgress.initTasks()仅仅对task进行了初始化,并没有执行Task,经过一番跟踪,发现Task的执行,是由 TaskTracker 来处理。
  TaskTracker,实现了TaskUmbilicalProtocol接口。在之前的文章中,LocalJobRunner的内部类Job也实现了这个接口,这里对比一下:
  接口 JobSubmissionProtocol: LocalJobRunner <---> JobTracker
接口 TaskUmbilicalProtocol:LocalJobRunner.Job <---> TaskTracker
  下面对TaskTracker进行分析,首先也是从main入口开始。
  TaskTracker实现了Runnable,main实例化TaskTracker对象,然后执行run()方法。
  在构造函数中,主要进行初始化
  this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
initialize();
  initialize()里面,初始化一些变量值 ,然后初始化RPC服务器:
  while (true) {
try {
this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf);
this.taskReportServer.start();
break;
} catch (BindException e) {
LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");
this.taskReportPort++;
}

}
while (true) {
try {
this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);
this.mapOutputServer.start();
break;
} catch (BindException e) {
LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port");
this.mapOutputPort++;
}
}
  mapOutputServer使用一个循环来尝试各个端口绑定。
  最后一句
this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
  这里有一个新的接口InterTrackerProtocol,是TaskTracker和中央JobTracker通讯用的协议。通过这个接口, TaskTracker可以用来执行JobTracker中的Task了。接下来分析TaskServer的主流程,run()函数。
  run()中, 有两个while循环。在内部while循环里面,执行 offerService() 方法。它里面也是一个while循环,开始几段代码用于JobTracker的心跳监测。接下来,它通过协议接口调用JobTracker,获取Task并执行:
  if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
Task t = jobClient.pollForNewTask(taskTrackerName);
if (t != null) {
TaskInProgress tip = new TaskInProgress(t, this.fConf);
synchronized (this) {
tasks.put(t.getTaskId(), tip);
if (t.isMapTask()) {
mapTotal++;
} else {
reduceTotal++;
}
runningTasks.put(t.getTaskId(), tip);
}
tip.launchTask();
}
}
  tip.launchTask(); 开始执行这个Task,在方法内部:
  this.runner = task.createRunner(TaskTracker.this);
this.runner.start();
  Task 有两个子类 MapTask和ReduceTask,它们的createRunner()方法都会创建一个TaskRunner的子类,TaskRunner继承Thread,run()方法中:
  String sep = System.getProperty("path.separator");
File workDir = new File(new File(t.getJobFile()).getParent(), "work");
workDir.mkdirs();

StringBuffer classPath = new StringBuffer();
// start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
  JobConf job = new JobConf(t.getJobFile());
String jar = job.getJar();
if (jar != null) { // if jar exists, it into workDir
unJar(new File(jar), workDir);
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.append(sep); // add libs from jar to classpath
classPath.append(libs);
}
}
classPath.append(sep);
classPath.append(new File(workDir, "classes"));
classPath.append(sep);
classPath.append(workDir);
}

  获取工作目录,获取classpath。然后解压工作任务的jar包。
  // Build exec child jmv args.
Vector vargs = new Vector(8);
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");

vargs.add(jvm.toString());
  String javaOpts = handleDeprecatedHeapSize(
job.get("mapred.child.java.opts", "-Xmx200m"),
job.get("mapred.child.heap.size"));
javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
int port = job.getInt("mapred.task.tracker.report.port", 50050) + 1;
javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit);
}

// Add classpath.
vargs.add("-classpath");
vargs.add(classPath.toString());
// Add main class and its arguments
vargs.add(TaskTracker.Child.class.getName()); // main of Child
vargs.add(tracker.taskReportPort + ""); // pass umbilical port
vargs.add(t.getTaskId()); // pass task identifier
// Run java
runChild((String[])vargs.toArray(new String), workDir);
  这里是构造启动Java进程的classpath和其它vm参数,最后在 runChild 中开一个子进程来执行这个Task。感觉够复杂的。
  最后分析TaskTracker的内部类Child。它就是上面子进程执行的类。在main函数中
  TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
new InetSocketAddress(port), conf);

Task task = umbilical.getTask(taskid);
JobConf job = new JobConf(task.getJobFile());

conf.addFinalResource(new File(task.getJobFile()));
  可见该子进程也是通过RPC跟TaskTracker进行通讯。
  startPinging(umbilical, taskid); // start pinging parent
  开一个进程,对TaskTracker进行心跳监测。
  String workDir = job.getWorkingDirectory();
if (workDir != null) {
FileSystem file_sys = FileSystem.get(job);
file_sys.setWorkingDirectory(new File(workDir));
}
task.run(job, umbilical); // run the task
  这里才真正开始执行Task。

  分析到此告一段落,下面开始构造一个分布式执行的环境。
页: [1]
查看完整版本: Hadoop Inside (3)