昊漫玉 发表于 2016-12-10 06:20:11

Hadoop之TaskTraker分析(转)

TaskTracker的工作职责之前已经和大家提过,主要负责维护,申请和监控Task,通过heartbeat和JobTracker进行通信。
     TaskTracker的init过程:
     1.读取配置文件,解析参数
     2.将TaskTraker上原有的用户local files删除并新建新的dir和file
     3. Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); 清除map
     4.    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();记录task的链表
            this.runningJobs = new TreeMap<JobID, RunningJob>();记录job的id信息
     5.初始化JVMManager:
 view plaincopyprint? 



[*]mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),   
[*]      true, tracker);  
[*]  reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),  
[*]      false, tracker);  

       6.初始化RPC,获取JobTracker client用于heartbeat通信;
 
     7.new一个 后台线程用于监听map完成的事件
 view plaincopyprint? 



[*]this.mapEventsFetcher = new MapEventsFetcherThread();  
[*]mapEventsFetcher.setDaemon(true);  
[*]mapEventsFetcher.setName(  
[*]                         "Map-events fetcher for all reduce tasks " + "on " +   
[*]                         taskTrackerName);  
[*]mapEventsFetcher.start();  

      后台线程的run方法如下:
 
 
 view plaincopyprint? 



[*]while (running) {  
[*]       try {  
[*]         List <FetchStatus> fList = null;  
[*]         synchronized (runningJobs) {  
[*]           while (((fList = reducesInShuffle()).size()) == 0) {  
[*]             try {  
[*]               runningJobs.wait();  
[*]             } catch (InterruptedException e) {  
[*]               LOG.info("Shutting down: " + this.getName());  
[*]               return;  
[*]             }  
[*]           }  
[*]         }  
[*]         // now fetch all the map task events for all the reduce tasks  
[*]         // possibly belonging to different jobs  
[*]         boolean fetchAgain = false; //flag signifying whether we want to fetch  
[*]                                     //immediately again.  
[*]         for (FetchStatus f : fList) {  
[*]           long currentTime = System.currentTimeMillis();  
[*]           try {  
[*]             //the method below will return true when we have not   
[*]             //fetched all available events yet  
[*]             if (f.fetchMapCompletionEvents(currentTime)) {  
[*]               fetchAgain = true;  
[*]             }  
[*]           } catch (Exception e) {  
[*]             LOG.warn(  
[*]                      "Ignoring exception that fetch for map completion" +  
[*]                      " events threw for " + f.jobId + " threw: " +  
[*]                      StringUtils.stringifyException(e));   
[*]           }  
[*]           if (!running) {  
[*]             break;  
[*]           }  
[*]         }  
[*]         synchronized (waitingOn) {  
[*]           try {  
[*]             if (!fetchAgain) {  
[*]               waitingOn.wait(heartbeatInterval);  
[*]             }  
[*]           } catch (InterruptedException ie) {  
[*]             LOG.info("Shutting down: " + this.getName());  
[*]             return;  
[*]           }  
[*]         }  
[*]       } catch (Exception e) {  
[*]         LOG.info("Ignoring exception "  + e.getMessage());  
[*]       }  
[*]     }  
[*]   }   

  8.initializeMemoryManagement,初始化每个TrackTask的内存设置
 
9.new一个Map和Reducer的Launcher后台线程
 
 view plaincopyprint? 



[*]mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);  
[*] reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);  
[*] mapLauncher.start();  
[*] reduceLauncher.start();  

  用于后面创建子JVM来执行map、reduce task
 
看一下
 view plaincopyprint? 



[*]TaskLauncher的run方法:  
[*] //before preparing the job localize   
[*]      //all the archives  
[*]      TaskAttemptID taskid = t.getTaskID();  
[*]      final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");  
[*]      //simply get the location of the workDir and pass it to the child. The  
[*]      //child will do the actual dir creation  
[*]      final File workDir =  
[*]      new File(new Path(localdirs,   
[*]          TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),   
[*]          taskid.toString(),  
[*]          t.isTaskCleanupTask())).toString());  
[*]        
[*]      String user = tip.getUGI().getUserName();  
[*]        
[*]      // Set up the child task's configuration. After this call, no localization  
[*]      // of files should happen in the TaskTracker's process space. Any changes to  
[*]      // the conf object after this will NOT be reflected to the child.  
[*]      // setupChildTaskConfiguration(lDirAlloc);  
[*]  
[*]      if (!prepare()) {  
[*]        return;  
[*]      }  
[*]        
[*]      // Accumulates class paths for child.  
[*]      List<String> classPaths = getClassPaths(conf, workDir,  
[*]                                              taskDistributedCacheManager);  
[*]  
[*]      long logSize = TaskLog.getTaskLogLength(conf);  
[*]        
[*]      //  Build exec child JVM args.  
[*]      Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);  
[*]        
[*]      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);  
[*]  
[*]      // set memory limit using ulimit if feasible and necessary ...  
[*]      String setup = getVMSetupCmd();  
[*]      // Set up the redirection of the task's stdout and stderr streams  
[*]      File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());  
[*]      File stdout = logFiles[0];  
[*]      File stderr = logFiles[1];  
[*]      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,  
[*]                 stderr);  
[*]        
[*]      Map<String, String> env = new HashMap<String, String>();  
[*]      errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,  
[*]                                   logSize);  
[*]        
[*]      // flatten the env as a set of export commands  
[*]      List <String> setupCmds = new ArrayList<String>();  
[*]      for(Entry<String, String> entry : env.entrySet()) {  
[*]        StringBuffer sb = new StringBuffer();  
[*]        sb.append("export ");  
[*]        sb.append(entry.getKey());  
[*]        sb.append("=\"");  
[*]        sb.append(entry.getValue());  
[*]        sb.append("\"");  
[*]        setupCmds.add(sb.toString());  
[*]      }  
[*]      setupCmds.add(setup);  
[*]        
[*]      launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);  
[*]      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());  
[*]      if (exitCodeSet) {  
[*]        if (!killed && exitCode != 0) {  
[*]          if (exitCode == 65) {  
[*]            tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());  
[*]          }  
[*]          throw new IOException("Task process exit with nonzero status of " +  
[*]              exitCode + ".");  
[*]        }  
[*]      }  
[*]    }  

  run方法为当前task new一个child JVM,为其设置文件路径,上下文环境,JVM启动参数和启动命令等信息,然后调用TaskControll方法启动新的JVM执行对应的Task工作。
 
各个类关系图如下所示:

最后以TaskController的launchTask截至
10.然后开始  startHealthMonitor(this.fConf);
 
 
再来看看TaskLauncher的run方法,就是不停的循环去获取TaskTracker中新的task,然后调用startNewTask方法
 
 view plaincopyprint? 



[*]if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||  
[*]         this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||  
[*]         this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {  
[*]       localizeTask(task);  
[*]       if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {  
[*]         this.taskStatus.setRunState(TaskStatus.State.RUNNING);  
[*]       }  
[*]       setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));  
[*]       this.runner.start();  
[*]       long now = System.currentTimeMillis();  
[*]       this.taskStatus.setStartTime(now);  
[*]       this.lastProgressReport = now;  

  TaskTracker的run方法:通过维护心跳和JobTracker通信,以获取、杀掉新的Task,重点看一下heartBeat通信过程:
 
 
 view plaincopyprint? 



[*]synchronized (this) {  
[*]     askForNewTask =   
[*]       ((status.countOccupiedMapSlots() < maxMapSlots ||   
[*]         status.countOccupiedReduceSlots() < maxReduceSlots) &&   
[*]        acceptNewTasks);   
[*]     localMinSpaceStart = minSpaceStart;  
[*]   }  
[*]   if (askForNewTask) {  
[*]     askForNewTask = enoughFreeSpace(localMinSpaceStart);  
[*]     long freeDiskSpace = getFreeSpace();  
[*]     long totVmem = getTotalVirtualMemoryOnTT();  
[*]     long totPmem = getTotalPhysicalMemoryOnTT();  
[*]     long availableVmem = getAvailableVirtualMemoryOnTT();  
[*]     long availablePmem = getAvailablePhysicalMemoryOnTT();  
[*]     long cumuCpuTime = getCumulativeCpuTimeOnTT();  
[*]     long cpuFreq = getCpuFrequencyOnTT();  
[*]     int numCpu = getNumProcessorsOnTT();  
[*]     float cpuUsage = getCpuUsageOnTT();  
[*]  
[*]     status.getResourceStatus().setAvailableSpace(freeDiskSpace);  
[*]     status.getResourceStatus().setTotalVirtualMemory(totVmem);  
[*]     status.getResourceStatus().setTotalPhysicalMemory(totPmem);  
[*]     status.getResourceStatus().setMapSlotMemorySizeOnTT(  
[*]         mapSlotMemorySizeOnTT);  
[*]     status.getResourceStatus().setReduceSlotMemorySizeOnTT(  
[*]         reduceSlotSizeMemoryOnTT);  
[*]     status.getResourceStatus().setAvailableVirtualMemory(availableVmem);   
[*]     status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);  
[*]     status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);  
[*]     status.getResourceStatus().setCpuFrequency(cpuFreq);  
[*]     status.getResourceStatus().setNumProcessors(numCpu);  
[*]     status.getResourceStatus().setCpuUsage(cpuUsage);  
[*]   }  
[*]   //add node health information  
[*]     
[*]   TaskTrackerHealthStatus healthStatus = status.getHealthStatus();  
[*]   synchronized (this) {  
[*]     if (healthChecker != null) {  
[*]       healthChecker.setHealthStatus(healthStatus);  
[*]     } else {  
[*]       healthStatus.setNodeHealthy(true);  
[*]       healthStatus.setLastReported(0L);  
[*]       healthStatus.setHealthReport("");  
[*]     }  
[*]   }  
[*]   //  
[*]   // Xmit the heartbeat  
[*]   //  
[*]   HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,   
[*]                                                             justStarted,  
[*]                                                             justInited,  
[*]                                                             askForNewTask,   
[*]                                                             heartbeatResponseId);  

  
该方法主要将TaskTracker上的各种性能参数信息反馈给JobTraker,调用其heartbeat方法然后解析返回的结果,下篇详细分析heartBeat机制
页: [1]
查看完整版本: Hadoop之TaskTraker分析(转)