设为首页 收藏本站
查看: 879|回复: 0

[经验分享] Hadoop之JobTrack分析(转)

[复制链接]

尚未签到

发表于 2016-12-8 11:00:05 | 显示全部楼层 |阅读模式
1.client端指定Job的各种参数配置之后调用job.waitForCompletion(true) 方法提交Job给JobTracker,等待Job 完成。
[java] view plaincopyprint? 



  • public void submit() throws IOException, InterruptedException,   
  •                              ClassNotFoundException {  
  •    ensureState(JobState.DEFINE);//检查JobState状态  
  •    setUseNewAPI();//检查及设置是否使用新的MapReduce API  
  •      
  •    // Connect to the JobTracker and submit the job  
  •    connect();//链接JobTracker  
  •    info = jobClient.submitJobInternal(conf);//将job信息提交  
  •    super.setJobID(info.getID());  
  •    state = JobState.RUNNING;//更改job状态  
  •   }  

  
以上代码主要有两步骤,连接JobTracker并提交Job信息。connect方法主要是实例化JobClient对象,包括设置JobConf和init工作:
 
 
[java] view plaincopyprint? 



  • public void init(JobConf conf) throws IOException {  
  •     String tracker = conf.get("mapred.job.tracker""local");//读取配置文件信息用于判断该Job是运行于本地单机模式还是分布式模式  
  •     tasklogtimeout = conf.getInt(  
  •       TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);  
  •     this.ugi = UserGroupInformation.getCurrentUser();  
  •     if ("local".equals(tracker)) {//如果是单机模式,new LocalJobRunner   
  •       conf.setNumMapTasks(1);  
  •       this.jobSubmitClient = new LocalJobRunner(conf);  
  •     } else {  
  •       this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);  
  •     }          
  •   }  

  分布式模式下就会创建一个RPC代理链接:
 
 
[java] view plaincopyprint? 



  • public static VersionedProtocol getProxy(  
  •       Class<? extends VersionedProtocol> protocol,  
  •       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,  
  •       Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {  
  •   
  •     if (UserGroupInformation.isSecurityEnabled()) {  
  •       SaslRpcServer.init(conf);  
  •     }  
  •     VersionedProtocol proxy =  
  •         (VersionedProtocol) Proxy.newProxyInstance(  
  •             protocol.getClassLoader(), new Class[] { protocol },  
  •             new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));  
  •     long serverVersion = proxy.getProtocolVersion(protocol.getName(),   
  •                                                   clientVersion);  
  •     if (serverVersion == clientVersion) {  
  •       return proxy;  
  •     } else {  
  •       throw new VersionMismatch(protocol.getName(), clientVersion,   
  •                                 serverVersion);  
  •     }  
  •   }  

从上述代码可以看出hadoop实际上使用了Java自带的Proxy API来实现Remote Procedure Call
初始完之后,需要提交job
[java] view plaincopyprint? 



  • info = jobClient.submitJobInternal(conf);//将job信息提交  

  submit方法做以下几件事情:
 
         1.将conf中目录名字替换成hdfs代理的名字
         2.检查output是否合法:比如路径是否已经存在,是否是明确的
         3.将数据分成多个split并放到hdfs上面,写入job.xml文件
         4.调用JobTracker的submitJob方法
        该方法主要新建JobInProgress对象,然后检查访问权限和系统参数是否满足job,最后addJob:
     
[java] view plaincopyprint? 



  • private synchronized JobStatus addJob(JobID jobId, JobInProgress job)   
  •  throws IOException {  
  •    totalSubmissions++;  
  •   
  •    synchronized (jobs) {  
  •      synchronized (taskScheduler) {  
  •        jobs.put(job.getProfile().getJobID(), job);  
  •        for (JobInProgressListener listener : jobInProgressListeners) {  
  •          listener.jobAdded(job);  
  •        }  
  •      }  
  •    }  
  •    myInstrumentation.submitJob(job.getJobConf(), jobId);  
  •    job.getQueueMetrics().submitJob(job.getJobConf(), jobId);  
  •   
  •    LOG.info("Job " + jobId + " added successfully for user '"   
  •             + job.getJobConf().getUser() + "' to queue '"   
  •             + job.getJobConf().getQueueName() + "'");  
  •    AuditLogger.logSuccess(job.getUser(),   
  •        Operation.SUBMIT_JOB.name(), jobId.toString());  
  •    return job.getStatus();  
  •  }  

  totalSubmissions记录client端提交job到JobTracker的次数。而jobs则是JobTracker所有可以管理的job的映射表
 
Map<JobID, JobInProgress> jobs =  Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>());
taskScheduler是用于调度job先后执行策略的,其类图如下所示:
 
 
 
DSC0000.gif


 
hadoop job调度机制;
public enum SchedulingMode {
  FAIR, FIFO
}
1.公平调度FairScheduler
   对于每个用户而言,分布式资源是公平分配的,每个用户都有一个job池,假若某个用户目前所占有的资源很多,对于其他用户而言是不公平的,那么调度器就会杀掉占有资源多的用户的一些task,释放资源供他人使用
2.容量调度JobQueueTaskScheduler
在分布式系统上维护多个队列,每个队列都有一定的容量,每个队列中的job按照FIFO的策略进行调度。队列中可以包含队列。

两个Scheduler都要实现TaskScheduler的public synchronized List<Task> assignTasks(TaskTracker tracker)方法,该方法通过具体的计算生成可以分配的task
 
接下来看看JobTracker的工作:
记录更新JobTracker重试的次数:
[java] view plaincopyprint? 



  • while (true) {  
  •      try {  
  •        recoveryManager.updateRestartCount();  
  •        break;  
  •      } catch (IOException ioe) {  
  •        LOG.warn("Failed to initialize recovery manager. ", ioe);  
  •        // wait for some time  
  •        Thread.sleep(FS_ACCESS_RETRY_PERIOD);  
  •        LOG.warn("Retrying...");  
  •      }  
  •    }  

  启动Job调度器,默认是FairScheduler:
 taskScheduler.start();主要是初始化一些管理对象,比如job pool管理池
 
 
[java] view plaincopyprint? 



  • // Initialize other pieces of the scheduler  
  •   jobInitializer = new JobInitializer(conf, taskTrackerManager);  
  •   taskTrackerManager.addJobInProgressListener(jobListener);  
  •   poolMgr = new PoolManager(this);  
  •   poolMgr.initialize();  
  •   loadMgr = (LoadManager) ReflectionUtils.newInstance(  
  •       conf.getClass("mapred.fairscheduler.loadmanager",   
  •           CapBasedLoadManager.class, LoadManager.class), conf);  
  •   loadMgr.setTaskTrackerManager(taskTrackerManager);  
  •   loadMgr.setEventLog(eventLog);  
  •   loadMgr.start();  
  •   taskSelector = (TaskSelector) ReflectionUtils.newInstance(  
  •       conf.getClass("mapred.fairscheduler.taskselector",   
  •           DefaultTaskSelector.class, TaskSelector.class), conf);  
  •   taskSelector.setTaskTrackerManager(taskTrackerManager);  
  •   taskSelector.start();  

[java] view plaincopyprint? 



  • JobInitializer有一个确定大小的ExecutorService threadPool,每个thread用于初始化job  

[java] view plaincopyprint? 



  • try {  
  •      JobStatus prevStatus = (JobStatus)job.getStatus().clone();  
  •      LOG.info("Initializing " + job.getJobID());  
  •      job.initTasks();  
  •      // Inform the listeners if the job state has changed  
  •      // Note : that the job will be in PREP state.  
  •      JobStatus newStatus = (JobStatus)job.getStatus().clone();  
  •      if (prevStatus.getRunState() != newStatus.getRunState()) {  
  •        JobStatusChangeEvent event =   
  •          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,   
  •              newStatus);  
  •        synchronized (JobTracker.this) {  
  •          updateJobInProgressListeners(event);  
  •        }  
  •      }  
  •    }  

  初始化操作主要用于初始化生成tasks然后通知其他的监听者执行其他操作。initTasks主要处理以下工作:
 
 
[java] view plaincopyprint? 



  • // 记录用户提交的运行的job信息  
  •    try {  
  •    userUGI.doAs(new PrivilegedExceptionAction<Object>() {  
  •      @Override  
  •      public Object run() throws Exception {  
  •        JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,   
  •            startTimeFinal, hasRestarted());  
  •        return null;  
  •      }  
  •    });  
  •    } catch(InterruptedException ie) {  
  •      throw new IOException(ie);  
  •    }  
  •      
  •    // 设置并记录job的优先级  
  •    setPriority(this.priority);  
  •      
  •    //  
  •    //生成每个Task需要的密钥  
  •    //  
  •    generateAndStoreTokens();  
  •      

  然后读取JobTracker split的数据的元信息,元信息包括以下属性信息:
 
 
[java] view plaincopyprint? 



  • private TaskSplitIndex splitIndex;//洗牌后的索引位置  
  •   private long inputDataLength;//洗牌后数据长度  
  •   private String[] locations;//数据存储位置  

  
然后根据元信息的长度来计算numMapTasks并校验数据存储地址是否可以连接
 
接下来生成map tasks和reducer tasks:
 
[java] view plaincopyprint? 



  • maps = new TaskInProgress[numMapTasks];  
  •  for(int i=0; i < numMapTasks; ++i) {  
  •    inputLength += splits.getInputDataLength();  
  •    maps = new TaskInProgress(jobId, jobFile,   
  •                                 splits,   
  •                                 jobtracker, conf, this, i, numSlotsPerMap);  
  •  }  

[java] view plaincopyprint? 



  • this.jobFile = jobFile;  
  •  this.splitInfo = split;  
  •  this.jobtracker = jobtracker;  
  •  this.job = job;  
  •  this.conf = conf;  
  •  this.partition = partition;  
  •  this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);  
  •  this.numSlotsRequired = numSlotsRequired;  
  •  setMaxTaskAttempts();  
  •  init(jobid);  

  以上除了task对应的jobTracker,split信息和job信息外,还设置了
[java] view plaincopyprint? 



  • maxSkipRecords ---记录task执行的时候最大可以跳过的错误记录数;  
  • <pre name="code" class="java">setMaxTaskAttempts--设置task最多可以执行的次数。当一个task执行两次都失败了之后,会以skip mode模式再重新执行一次,记录那些bad record,  
  • 然后第四次再执行的时候,跳过这些bad records</pre><p></p>  
  • <pre></pre>  
  • 新建reducer task的过程也很类似。  
  • <p></p>  
  • <p><br>  
  • </p>  
  • <p><br>  
  • </p>  
  • <p><br>  
  • </p>  
  • <p><br>  
  • </p>  
  • <p></p>  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311435-1-1.html 上篇帖子: 运用hadoop计算TF-IDF 下篇帖子: hadoop-hdfs整体结构剖析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表