Hadoop之TaskTraker分析(转)
TaskTracker的工作职责之前已经和大家提过,主要负责维护,申请和监控Task,通过heartbeat和JobTracker进行通信。TaskTracker的init过程:
1.读取配置文件,解析参数
2.将TaskTraker上原有的用户local files删除并新建新的dir和file
3. Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); 清除map
4. this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();记录task的链表
this.runningJobs = new TreeMap<JobID, RunningJob>();记录job的id信息
5.初始化JVMManager:
view plaincopyprint?
[*]mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),
[*] true, tracker);
[*] reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
[*] false, tracker);
6.初始化RPC,获取JobTracker client用于heartbeat通信;
7.new一个 后台线程用于监听map完成的事件
view plaincopyprint?
[*]this.mapEventsFetcher = new MapEventsFetcherThread();
[*]mapEventsFetcher.setDaemon(true);
[*]mapEventsFetcher.setName(
[*] "Map-events fetcher for all reduce tasks " + "on " +
[*] taskTrackerName);
[*]mapEventsFetcher.start();
后台线程的run方法如下:
view plaincopyprint?
[*]while (running) {
[*] try {
[*] List <FetchStatus> fList = null;
[*] synchronized (runningJobs) {
[*] while (((fList = reducesInShuffle()).size()) == 0) {
[*] try {
[*] runningJobs.wait();
[*] } catch (InterruptedException e) {
[*] LOG.info("Shutting down: " + this.getName());
[*] return;
[*] }
[*] }
[*] }
[*] // now fetch all the map task events for all the reduce tasks
[*] // possibly belonging to different jobs
[*] boolean fetchAgain = false; //flag signifying whether we want to fetch
[*] //immediately again.
[*] for (FetchStatus f : fList) {
[*] long currentTime = System.currentTimeMillis();
[*] try {
[*] //the method below will return true when we have not
[*] //fetched all available events yet
[*] if (f.fetchMapCompletionEvents(currentTime)) {
[*] fetchAgain = true;
[*] }
[*] } catch (Exception e) {
[*] LOG.warn(
[*] "Ignoring exception that fetch for map completion" +
[*] " events threw for " + f.jobId + " threw: " +
[*] StringUtils.stringifyException(e));
[*] }
[*] if (!running) {
[*] break;
[*] }
[*] }
[*] synchronized (waitingOn) {
[*] try {
[*] if (!fetchAgain) {
[*] waitingOn.wait(heartbeatInterval);
[*] }
[*] } catch (InterruptedException ie) {
[*] LOG.info("Shutting down: " + this.getName());
[*] return;
[*] }
[*] }
[*] } catch (Exception e) {
[*] LOG.info("Ignoring exception " + e.getMessage());
[*] }
[*] }
[*] }
8.initializeMemoryManagement,初始化每个TrackTask的内存设置
9.new一个Map和Reducer的Launcher后台线程
view plaincopyprint?
[*]mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
[*] reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
[*] mapLauncher.start();
[*] reduceLauncher.start();
用于后面创建子JVM来执行map、reduce task
看一下
view plaincopyprint?
[*]TaskLauncher的run方法:
[*] //before preparing the job localize
[*] //all the archives
[*] TaskAttemptID taskid = t.getTaskID();
[*] final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
[*] //simply get the location of the workDir and pass it to the child. The
[*] //child will do the actual dir creation
[*] final File workDir =
[*] new File(new Path(localdirs,
[*] TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
[*] taskid.toString(),
[*] t.isTaskCleanupTask())).toString());
[*]
[*] String user = tip.getUGI().getUserName();
[*]
[*] // Set up the child task's configuration. After this call, no localization
[*] // of files should happen in the TaskTracker's process space. Any changes to
[*] // the conf object after this will NOT be reflected to the child.
[*] // setupChildTaskConfiguration(lDirAlloc);
[*]
[*] if (!prepare()) {
[*] return;
[*] }
[*]
[*] // Accumulates class paths for child.
[*] List<String> classPaths = getClassPaths(conf, workDir,
[*] taskDistributedCacheManager);
[*]
[*] long logSize = TaskLog.getTaskLogLength(conf);
[*]
[*] // Build exec child JVM args.
[*] Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);
[*]
[*] tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
[*]
[*] // set memory limit using ulimit if feasible and necessary ...
[*] String setup = getVMSetupCmd();
[*] // Set up the redirection of the task's stdout and stderr streams
[*] File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
[*] File stdout = logFiles[0];
[*] File stderr = logFiles[1];
[*] tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
[*] stderr);
[*]
[*] Map<String, String> env = new HashMap<String, String>();
[*] errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
[*] logSize);
[*]
[*] // flatten the env as a set of export commands
[*] List <String> setupCmds = new ArrayList<String>();
[*] for(Entry<String, String> entry : env.entrySet()) {
[*] StringBuffer sb = new StringBuffer();
[*] sb.append("export ");
[*] sb.append(entry.getKey());
[*] sb.append("=\"");
[*] sb.append(entry.getValue());
[*] sb.append("\"");
[*] setupCmds.add(sb.toString());
[*] }
[*] setupCmds.add(setup);
[*]
[*] launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
[*] tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
[*] if (exitCodeSet) {
[*] if (!killed && exitCode != 0) {
[*] if (exitCode == 65) {
[*] tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
[*] }
[*] throw new IOException("Task process exit with nonzero status of " +
[*] exitCode + ".");
[*] }
[*] }
[*] }
run方法为当前task new一个child JVM,为其设置文件路径,上下文环境,JVM启动参数和启动命令等信息,然后调用TaskControll方法启动新的JVM执行对应的Task工作。
各个类关系图如下所示:
最后以TaskController的launchTask截至
10.然后开始 startHealthMonitor(this.fConf);
再来看看TaskLauncher的run方法,就是不停的循环去获取TaskTracker中新的task,然后调用startNewTask方法
view plaincopyprint?
[*]if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
[*] this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
[*] this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
[*] localizeTask(task);
[*] if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
[*] this.taskStatus.setRunState(TaskStatus.State.RUNNING);
[*] }
[*] setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
[*] this.runner.start();
[*] long now = System.currentTimeMillis();
[*] this.taskStatus.setStartTime(now);
[*] this.lastProgressReport = now;
TaskTracker的run方法:通过维护心跳和JobTracker通信,以获取、杀掉新的Task,重点看一下heartBeat通信过程:
view plaincopyprint?
[*]synchronized (this) {
[*] askForNewTask =
[*] ((status.countOccupiedMapSlots() < maxMapSlots ||
[*] status.countOccupiedReduceSlots() < maxReduceSlots) &&
[*] acceptNewTasks);
[*] localMinSpaceStart = minSpaceStart;
[*] }
[*] if (askForNewTask) {
[*] askForNewTask = enoughFreeSpace(localMinSpaceStart);
[*] long freeDiskSpace = getFreeSpace();
[*] long totVmem = getTotalVirtualMemoryOnTT();
[*] long totPmem = getTotalPhysicalMemoryOnTT();
[*] long availableVmem = getAvailableVirtualMemoryOnTT();
[*] long availablePmem = getAvailablePhysicalMemoryOnTT();
[*] long cumuCpuTime = getCumulativeCpuTimeOnTT();
[*] long cpuFreq = getCpuFrequencyOnTT();
[*] int numCpu = getNumProcessorsOnTT();
[*] float cpuUsage = getCpuUsageOnTT();
[*]
[*] status.getResourceStatus().setAvailableSpace(freeDiskSpace);
[*] status.getResourceStatus().setTotalVirtualMemory(totVmem);
[*] status.getResourceStatus().setTotalPhysicalMemory(totPmem);
[*] status.getResourceStatus().setMapSlotMemorySizeOnTT(
[*] mapSlotMemorySizeOnTT);
[*] status.getResourceStatus().setReduceSlotMemorySizeOnTT(
[*] reduceSlotSizeMemoryOnTT);
[*] status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
[*] status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
[*] status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
[*] status.getResourceStatus().setCpuFrequency(cpuFreq);
[*] status.getResourceStatus().setNumProcessors(numCpu);
[*] status.getResourceStatus().setCpuUsage(cpuUsage);
[*] }
[*] //add node health information
[*]
[*] TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
[*] synchronized (this) {
[*] if (healthChecker != null) {
[*] healthChecker.setHealthStatus(healthStatus);
[*] } else {
[*] healthStatus.setNodeHealthy(true);
[*] healthStatus.setLastReported(0L);
[*] healthStatus.setHealthReport("");
[*] }
[*] }
[*] //
[*] // Xmit the heartbeat
[*] //
[*] HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
[*] justStarted,
[*] justInited,
[*] askForNewTask,
[*] heartbeatResponseId);
该方法主要将TaskTracker上的各种性能参数信息反馈给JobTraker,调用其heartbeat方法然后解析返回的结果,下篇详细分析heartBeat机制
页:
[1]