设为首页 收藏本站
查看: 914|回复: 0

[经验分享] hadoop任务调度详解

[复制链接]

尚未签到

发表于 2016-12-9 06:24:19 | 显示全部楼层 |阅读模式
 
      hadoop任务调度详解
    任何作业的运行,首先得从用户端提交作业给jobTracker开始。
  

  用户端:
         用户程序通过job类的submit方法向jobTracker提交作业,job类使用jobClient类来做一系列工作.

  (1)向jobTracker申请作业ID号。

  (2)检查作业的输出格式是否正确,比如作业的输出目录是否存在或已经存在,若输出格式不正确,会将错误信息返回给用户端的控制台。(实际我们也经常遇到这种错误,the directory already exists.)
  
        (3)检查输入目录是否存在。

   (4)创建job.split,job.xml文件,将这些文件放在该job对应ID号的目录下。(每个Job在hdfs上会有对应的ID的目录)

  (5)根据input format格式获得相应的input split,将input split信息写入到job.split文件中。(input split信息包括split的大小,split的内容,此split在文件的位置,以及此split在哪个datanode下。)
  (6)将job所需的jar包上传到hdfs上,并且将job的配置信息写入到job.xml文件中.

  (7)将job提交给jobTracker.

  JobTracker

         在jobTracker的构造函数中,会生成taskScheduler成员变量,该成员变量负责调度job,它默认格式是JobQueueTaskScheduler,它采用的是FIFO的调度方式,在jobTracker的offService()中调用taskScheduler.start()方法,在该start()方法中有两个监听器 ,一个是jobQueueJobInprogressListener(用于监听job的运行),一个是eagerTaskInitializationListener(用于对job的初始化),在eagerTaskInitializationListener中启动一个jobInitManagerThread线程,不断从jobInitQueue队列中得到jobInProgress对象,调用jobInprogress中的initTasks()方法完成job初始化,jobInprogress对象是在job提交给jobTracker时通过调用jobAdd()方法添加到jobInitQueue队列中的。initTasks()方法中主要实现的操作有:
    (1)从hdfs上读取job.split文件从而生成input splits.设定map Task的个数(也就是input splits的个数)。
   (2)为每个map task生成TaskInprogress对象.
              
         maps = new TaskInProgress(jobId, jobFile,
splits,
jobtracker, conf, this, i, numSlotsPerMap);

  (3)在将这些map task放入到nonRunningMapCache中.(当taskTracker具备运行map task环境时,直接从cache中取)。
  (4)为每个reduce task生成TaskInprogress对象,并将其放入到nonRunningReduces中(当taskTracker具备运行reduce task环境时,直接从nonRunningReduces中取)。
  (5)创建两个cleanup task,一个用于清理map ,一个用于清理reduce.
  (6)创建两个setup task,一个初始化map,一个初始化reduce.
  当jobTracker收到taskTracker调用heartbeat()方法后,首先会检查上一次心跳是否完成,如果一切正常,则会处理心跳。jobTracker会调用taskScheduler组装任务到任务列表中,具体实现在asignTasks()方法中,tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));得到这些任务之后,便将其封装在一些LauchTaskAction中,发回给taskTracker,让它去执行。


  TaskTracker
  taskTracker会每隔一段时间通过调用transmitHeartBeat()方法放送一次心跳给jobTracker,在transmitHeartBeat()方法中,taskTracker首先会检查目前执行task的任务以及磁盘的使用情况,

askForNewTask =
((status.countOccupiedMapSlots() < maxMapSlots || (当前taskTracker运行map或reduce task的个数小于最大可运行的map 或reduce task的个数)
status.countOccupiedReduceSlots() < maxReduceSlots) &&
acceptNewTasks);  localMinSpaceStart = minSpaceStart;
  如果可以接收新的任务,则将askForNewTask参数设置成true.向jobTracker发送心跳。

HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted,
justInited,
askForNewTask,
heartbeatResponseId);
   
  这是RPC的调用,获得hearbeatResponse信息。通过调用heartbeatResponse.getActions()获得jobTracker传过来的taskTrackerAction数组.遍历数组,看是否具有launchTaskAction,如果有则将其加入到队列中,调用addToTaskQueue,如果是map task,则放入到mapLaucher中,如果是reduce task,则放入到reduceLaucher中。taskLaucher是一个线程,它从以上队列中获得taskInprogress对象,然后调用startNewTask()方法来启动一个task.该task启动时,会将该task运行所需要的文件(job.xml,job.split,job.jar)从hdfs上拷贝到本地文件系统中。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311509-1-1.html 上篇帖子: Hadoop简单的Map/Reduce 下篇帖子: Hadoop HDFS架构和设计
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表