q29191 发表于 2016-12-8 11:00:05

Hadoop之JobTrack分析(转)

1.client端指定Job的各种参数配置之后调用job.waitForCompletion(true) 方法提交Job给JobTracker,等待Job 完成。
 view plaincopyprint? 



[*]public void submit() throws IOException, InterruptedException,   
[*]                             ClassNotFoundException {  
[*]   ensureState(JobState.DEFINE);//检查JobState状态  
[*]   setUseNewAPI();//检查及设置是否使用新的MapReduce API  
[*]     
[*]   // Connect to the JobTracker and submit the job  
[*]   connect();//链接JobTracker  
[*]   info = jobClient.submitJobInternal(conf);//将job信息提交  
[*]   super.setJobID(info.getID());  
[*]   state = JobState.RUNNING;//更改job状态  
[*]  }  

  
以上代码主要有两步骤,连接JobTracker并提交Job信息。connect方法主要是实例化JobClient对象,包括设置JobConf和init工作:
 
 
 view plaincopyprint? 



[*]public void init(JobConf conf) throws IOException {  
[*]    String tracker = conf.get("mapred.job.tracker", "local");//读取配置文件信息用于判断该Job是运行于本地单机模式还是分布式模式  
[*]    tasklogtimeout = conf.getInt(  
[*]      TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);  
[*]    this.ugi = UserGroupInformation.getCurrentUser();  
[*]    if ("local".equals(tracker)) {//如果是单机模式,new LocalJobRunner   
[*]      conf.setNumMapTasks(1);  
[*]      this.jobSubmitClient = new LocalJobRunner(conf);  
[*]    } else {  
[*]      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);  
[*]    }          
[*]  }  

  分布式模式下就会创建一个RPC代理链接:
 
 
 view plaincopyprint? 



[*]public static VersionedProtocol getProxy(  
[*]      Class<? extends VersionedProtocol> protocol,  
[*]      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,  
[*]      Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {  
[*]  
[*]    if (UserGroupInformation.isSecurityEnabled()) {  
[*]      SaslRpcServer.init(conf);  
[*]    }  
[*]    VersionedProtocol proxy =  
[*]        (VersionedProtocol) Proxy.newProxyInstance(  
[*]            protocol.getClassLoader(), new Class[] { protocol },  
[*]            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));  
[*]    long serverVersion = proxy.getProtocolVersion(protocol.getName(),   
[*]                                                  clientVersion);  
[*]    if (serverVersion == clientVersion) {  
[*]      return proxy;  
[*]    } else {  
[*]      throw new VersionMismatch(protocol.getName(), clientVersion,   
[*]                                serverVersion);  
[*]    }  
[*]  }  

从上述代码可以看出hadoop实际上使用了Java自带的Proxy API来实现Remote Procedure Call
初始完之后,需要提交job
 view plaincopyprint? 



[*]info = jobClient.submitJobInternal(conf);//将job信息提交  

  submit方法做以下几件事情:
 
         1.将conf中目录名字替换成hdfs代理的名字
         2.检查output是否合法:比如路径是否已经存在,是否是明确的
         3.将数据分成多个split并放到hdfs上面,写入job.xml文件
         4.调用JobTracker的submitJob方法
        该方法主要新建JobInProgress对象,然后检查访问权限和系统参数是否满足job,最后addJob:
     
 view plaincopyprint? 



[*]private synchronized JobStatus addJob(JobID jobId, JobInProgress job)   
[*] throws IOException {  
[*]   totalSubmissions++;  
[*]  
[*]   synchronized (jobs) {  
[*]     synchronized (taskScheduler) {  
[*]       jobs.put(job.getProfile().getJobID(), job);  
[*]       for (JobInProgressListener listener : jobInProgressListeners) {  
[*]         listener.jobAdded(job);  
[*]       }  
[*]     }  
[*]   }  
[*]   myInstrumentation.submitJob(job.getJobConf(), jobId);  
[*]   job.getQueueMetrics().submitJob(job.getJobConf(), jobId);  
[*]  
[*]   LOG.info("Job " + jobId + " added successfully for user '"   
[*]            + job.getJobConf().getUser() + "' to queue '"   
[*]            + job.getJobConf().getQueueName() + "'");  
[*]   AuditLogger.logSuccess(job.getUser(),   
[*]       Operation.SUBMIT_JOB.name(), jobId.toString());  
[*]   return job.getStatus();  
[*] }  

  totalSubmissions记录client端提交job到JobTracker的次数。而jobs则是JobTracker所有可以管理的job的映射表
 
Map<JobID, JobInProgress> jobs =  Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>());
taskScheduler是用于调度job先后执行策略的,其类图如下所示:
 
 
 



 
hadoop job调度机制;
public enum SchedulingMode {
  FAIR, FIFO
}
1.公平调度FairScheduler
   对于每个用户而言,分布式资源是公平分配的,每个用户都有一个job池,假若某个用户目前所占有的资源很多,对于其他用户而言是不公平的,那么调度器就会杀掉占有资源多的用户的一些task,释放资源供他人使用
2.容量调度JobQueueTaskScheduler
在分布式系统上维护多个队列,每个队列都有一定的容量,每个队列中的job按照FIFO的策略进行调度。队列中可以包含队列。
两个Scheduler都要实现TaskScheduler的public synchronized List<Task> assignTasks(TaskTracker tracker)方法,该方法通过具体的计算生成可以分配的task
 
接下来看看JobTracker的工作:
记录更新JobTracker重试的次数:
 view plaincopyprint? 



[*]while (true) {  
[*]     try {  
[*]       recoveryManager.updateRestartCount();  
[*]       break;  
[*]     } catch (IOException ioe) {  
[*]       LOG.warn("Failed to initialize recovery manager. ", ioe);  
[*]       // wait for some time  
[*]       Thread.sleep(FS_ACCESS_RETRY_PERIOD);  
[*]       LOG.warn("Retrying...");  
[*]     }  
[*]   }  

  启动Job调度器,默认是FairScheduler:
 taskScheduler.start();主要是初始化一些管理对象,比如job pool管理池
 
 
 view plaincopyprint? 



[*]// Initialize other pieces of the scheduler  
[*]  jobInitializer = new JobInitializer(conf, taskTrackerManager);  
[*]  taskTrackerManager.addJobInProgressListener(jobListener);  
[*]  poolMgr = new PoolManager(this);  
[*]  poolMgr.initialize();  
[*]  loadMgr = (LoadManager) ReflectionUtils.newInstance(  
[*]      conf.getClass("mapred.fairscheduler.loadmanager",   
[*]          CapBasedLoadManager.class, LoadManager.class), conf);  
[*]  loadMgr.setTaskTrackerManager(taskTrackerManager);  
[*]  loadMgr.setEventLog(eventLog);  
[*]  loadMgr.start();  
[*]  taskSelector = (TaskSelector) ReflectionUtils.newInstance(  
[*]      conf.getClass("mapred.fairscheduler.taskselector",   
[*]          DefaultTaskSelector.class, TaskSelector.class), conf);  
[*]  taskSelector.setTaskTrackerManager(taskTrackerManager);  
[*]  taskSelector.start();  

 view plaincopyprint? 



[*]JobInitializer有一个确定大小的ExecutorService threadPool,每个thread用于初始化job  

 view plaincopyprint? 



[*]try {  
[*]     JobStatus prevStatus = (JobStatus)job.getStatus().clone();  
[*]     LOG.info("Initializing " + job.getJobID());  
[*]     job.initTasks();  
[*]     // Inform the listeners if the job state has changed  
[*]     // Note : that the job will be in PREP state.  
[*]     JobStatus newStatus = (JobStatus)job.getStatus().clone();  
[*]     if (prevStatus.getRunState() != newStatus.getRunState()) {  
[*]       JobStatusChangeEvent event =   
[*]         new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,   
[*]             newStatus);  
[*]       synchronized (JobTracker.this) {  
[*]         updateJobInProgressListeners(event);  
[*]       }  
[*]     }  
[*]   }  

  初始化操作主要用于初始化生成tasks然后通知其他的监听者执行其他操作。initTasks主要处理以下工作:
 
 
 view plaincopyprint? 



[*]// 记录用户提交的运行的job信息  
[*]   try {  
[*]   userUGI.doAs(new PrivilegedExceptionAction<Object>() {  
[*]     @Override  
[*]     public Object run() throws Exception {  
[*]       JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,   
[*]           startTimeFinal, hasRestarted());  
[*]       return null;  
[*]     }  
[*]   });  
[*]   } catch(InterruptedException ie) {  
[*]     throw new IOException(ie);  
[*]   }  
[*]     
[*]   // 设置并记录job的优先级  
[*]   setPriority(this.priority);  
[*]     
[*]   //  
[*]   //生成每个Task需要的密钥  
[*]   //  
[*]   generateAndStoreTokens();  
[*]     

  然后读取JobTracker split的数据的元信息,元信息包括以下属性信息:
 
 
 view plaincopyprint? 



[*]private TaskSplitIndex splitIndex;//洗牌后的索引位置  
[*]  private long inputDataLength;//洗牌后数据长度  
[*]  private String[] locations;//数据存储位置  

  
然后根据元信息的长度来计算numMapTasks并校验数据存储地址是否可以连接
 
接下来生成map tasks和reducer tasks:
 
 view plaincopyprint? 



[*]maps = new TaskInProgress;  
[*] for(int i=0; i < numMapTasks; ++i) {  
[*]   inputLength += splits.getInputDataLength();  
[*]   maps = new TaskInProgress(jobId, jobFile,   
[*]                                splits,   
[*]                                jobtracker, conf, this, i, numSlotsPerMap);  
[*] }  

 view plaincopyprint? 



[*]this.jobFile = jobFile;  
[*] this.splitInfo = split;  
[*] this.jobtracker = jobtracker;  
[*] this.job = job;  
[*] this.conf = conf;  
[*] this.partition = partition;  
[*] this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);  
[*] this.numSlotsRequired = numSlotsRequired;  
[*] setMaxTaskAttempts();  
[*] init(jobid);  

  以上除了task对应的jobTracker,split信息和job信息外,还设置了
 view plaincopyprint? 



[*]maxSkipRecords ---记录task执行的时候最大可以跳过的错误记录数;  
[*]<pre name="code" class="java">setMaxTaskAttempts--设置task最多可以执行的次数。当一个task执行两次都失败了之后,会以skip mode模式再重新执行一次,记录那些bad record,  
[*]然后第四次再执行的时候,跳过这些bad records</pre><p></p>  
[*]<pre></pre>  
[*]新建reducer task的过程也很类似。  
[*]<p></p>  
[*]<p><br>  
[*]</p>  
[*]<p><br>  
[*]</p>  
[*]<p><br>  
[*]</p>  
[*]<p><br>  
[*]</p>  
[*]<p></p>  
页: [1]
查看完整版本: Hadoop之JobTrack分析(转)