美奇科技 发表于 2016-12-10 09:02:44

Hadoop源码分析-MapReduce的Job提交过程

  命令为:
  hadoop_debug jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount /user/admin/in/yellow.txt /user/admin/out/555
  首先调用org.apache.hadoop.util.runJar.main
   public static void main(String[] args){
     // 加载Jar包 /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar
    JarFile jarFile = new JarFile(fileName); 
     //根据META-INF得知主Class为org/apache/hadoop/examples/ExampleDriver
  Manifest manifest = jarFile.getManifest();
  if (manifest != null) {
      mainClassName = manifest.getMainAttributes().getValue("Main-Class");
  }
   
      //建立本地临时文件夹 /tmp/hadoop-admin
   File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
     tmpDir.mkdirs();
   
      //建立本地工作文件夹 /tmp/hadoop-admin/hadoop-unjar4705742737164408087                final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
    workDir.delete();
      workDir.mkdirs();
   //JVM退出时将tmp/hadoop-admin/hadoop-unjar4705742737164408087删除
Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
          try {
            FileUtil.fullyDelete(workDir);
          } catch (IOException e) {
          }
        }
        });
     //将Jar包解压到/tmp/hadoop-admin/hadoop-unjar4705742737164408087   
    unJar(file, workDir);
 
   //将/tmp/hadoop-admin/hadoop-unjar4705742737164408087,/tmp/hadoop-admin/hadoop-unjar4705742737164408087/classes/, /tmp/hadoop-admin/hadoop-unjar4705742737164408087/lib 全部添加到classpath
    classPath.add(new File(workDir+"/").toURL());
    classPath.add(file.toURL());
    classPath.add(new File(workDir, "classes/").toURL());
    File[] libs = new File(workDir, "lib").listFiles();
    if (libs != null) {
      for (int i = 0; i < libs.length; i++) {
        classPath.add(libs.toURL());
      }
    }
 
   //运行主函数
    main.invoke(null, newObject[] { newArgs });
  }   
  设置属性:
job.setJarByClass(WordCount.class);          // mapred.jar
job.setMapperClass(WordCountMap.class);      // mapreduce.map.class
job.setReducerClass(WordCountReduce.class);  // mapreduce.reduce.class
job.setCombinerClass(WordCountReduce.class); // mapreduce.combine.class
job.setMapOutputKeyClass(Text.class);        // mapred.mapoutput.key.class
job.setMapOutputValueClass(IntWritable.class); // mapred.mapoutput.value.class
job.setOutputKeyClass(Text.class);             // mapred.output.key.class
job.setOutputValueClass(IntWritable.class);    // mapred.output.value.class
job.setJobName("WordCount");                  // mapred.job.name
 
FileInputFormat.addInputPath(job, input);     // mapred.input.dir
FileOutputFormat.setOutputPath(job, output);  // mapred.output.dir
 
 
job.submit()
 
  public void submit() throws IOException, InterruptedException,
                              ClassNotFoundException {
    ......
    // Connect to the JobTracker and submit the job
    connect();
    info = jobClient.submitJobInternal(conf);
    ......
   }
 
 
连接JobTracker:
 
private void connect() throws IOException, InterruptedException {
        ......
        jobClient = new JobClient((JobConf) getConfiguration());   
        ......
       
  }
 
  其中:
  public JobClient(JobConf conf) throws IOException {
    ......
    init(conf);
    }
public void init(JobConf conf) throws IOException {
     ......
     this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
    }
  private staticJobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
      Configuration conf) throws IOException {
    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
        JobSubmissionProtocol.versionID, addr,
        UserGroupInformation.getCurrentUser(), conf,
        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
    }
   
  此时获得一个实现JobSubmissionProtocol 的RPC调用,即JobTracker的代理。
   
获取job Staging Area
 
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
              jobCopy);
  RPC请求:JobSubmissionProtocol.getStagingAreaDir()
  返回:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging
   
  RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)
  返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@5521691b,即存在
   
  RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)
  返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@726c554,用以判断权限
   
  获得 New JobId
  JobID jobId = jobSubmitClient.getNewJobId();
   
  RPC请求:JobSubmissionProtocol.getNewJobId()
  返回:job_201404010621_0004
   
  建立 submit Job Dir:
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
 
  hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004
   
  复制Jar到HDFS
  copyAndConfigureFiles(jobCopy, submitJobDir);
   
  RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004)
  返回:null
   
  RPC请求:ClientProtocol.mkdirs(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004, rwxr-xr-x)
  返回:true
   
  RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004, rwx------)
  返回:null
   
  RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar)
  返回:null,即不存在
   
  RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, rwxr-xr-x, DFSClient_-1317833261, true, true, 3, 67108864)
  返回:输出流
   
  RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, DFSClient_-1317833261, null)
  返回:org.apache.hadoop.hdfs.protocol.LocatedBlock@1a9b701
  Block:blk_6689254996395759186_2720
  BlockToken:Ident: , Pass: , Kind: , Service:
  DataNode:
   
  RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, DFSClient_-1317833261)
  返回:true
   
  RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, 10)
  返回:true
   
  RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar, rw-r--r--)
  返回:null
   
  RPC请求:ClientProtocol.renewLease(DFSClient_-1317833261)
  返回:null
  此后有1个守护线程会不断发送 renewLease 请求
   
  此时本地文件/opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar被复制到HDFS 文件系统/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml
   
  Reduce数目:
   int reduces = jobCopy.getNumReduceTasks();
  reduce数目为2
   
  检查输出目录
  RPC请求:ClientProtocol.getFileInfo(/user/admin/out/555)
  返回:null,即不存在
   
  获取输入分片信息:
   int maps = writeSplits(context, submitJobDir);
  其中:
 private <T extends InputSplit>  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit);
 
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }
     
  其中:
  public List<InputSplit> getSplits(JobContext job
                                    ) throws IOException {
   ...........
    }
   
  RPC请求:ClientProtocol.getFileInfo(/user/admin/in/yellow.txt)
  返回:path="hdfs://server1:9000/user/admin/in/yellow.txt",length=201000000,isdir=false, block_replication=3, blocksize=67108864, permission=rw-r--r--,owner=Admin, group=supergroup
   
  RPC请求:ClientProtocol.getBlockLocations(/user/admin/in/yellow.txt, 0, 201000000)
  返回:3个BlockLocation
  offset={0},         length={67108864}, hosts={server3,server2}, names={ }, topologyPaths={ }
  offset={67108864},  length={67108864}, hosts={server3,server2}, names={ }, topologyPaths={ }
  offset={134217728}, length={66782272}, hosts={server3,server2}, names={ },topologyPaths={ }
    
  最终确定的分片信息 为3个Filespit
  Filespit: file={hdfs://server1:9000/user/admin/in/yellow.txt}, hosts={ }, length={ 67108864 }, start={0}
  Filespit: file={hdfs://server1:9000/user/admin/in/yellow.txt}, hosts={ }, length={ 67108864 }, start={67108864}
  Filespit: file={hdfs://server1:9000/user/admin/in/yellow.txt}, hosts={ }, length={ 66782272}, start={ 134217728}
   
  map数量为3
  jobCopy.setNumMapTasks(maps);
   
  建立分片文件:
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
          jobSubmitDir.getFileSystem(conf), array);
  RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, rwxr-xr-x, DFSClient_-1317833261, true, true, 3, 67108864);
  返回:输出流
   
  RPC请求:ClientProtocol setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, rw-r--r--)
  返回:null
   
  RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, 10)
  返回:true
 
RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, DFSClient_-1317833261, null)
返回:LocatedBlock 对象为
 
  Block: blockid=-921399365952861077, generationStamp=2714,numBytes=0
  BlockTokenIdentifier:Ident: , Pass: , Kind: , Service:
  DatanodeInfo[]:
   offset:0
   
  RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split, DFSClient_-1317833261)
  返回:true
   
  写入的 SplitMetaInfo 为



   
   RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo, rwxr-xr-x, DFSClient_-1317833261, true, true, 3, 67108864)
  返回:输出流
   
  RPC请求: ClientProtocol. setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo, rw-r--r--)
  返回:null
   
  RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo, DFSClient_-1317833261, null)
返回:LocatedBlock 对象为
 
  Block: blockid =789965327875207186, generationStamp= 2715,numBytes=0
  BlockTokenIdentifier:Ident: , Pass: , Kind: , Service:
  DatanodeInfo[]:
   offset:0
   
  RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo, DFSClient_-1317833261)
  返回:true
  
  设置AccessControl
  RPC请求:JobSubmissionProtocol.getQueueAdmins(default)
  返回:All users are allowed
   
  Write job file to JobTracker's fs  
  RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml, rwxr-xr-x, DFSClient_-1317833261, true, true, 3, 67108864)
  返回:输出流
   
  RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml, rw-r--r--)
  返回:null
   
  RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,DFSClient_-1317833261, null)
返回:LocatedBlock 对象为
 
  Block: blockid = -7725157033540829125, generationStamp= 2716,numBytes=0
  BlockTokenIdentifier:Ident: , Pass: , Kind: , Service:
  DatanodeInfo[]:
   offset:0
   
  RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml, DFSClient_-1317833261)
  返回:true
   
  此时"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/" 下生成文件 job.xml,包含了所有的配置信息.
  此时HDFS目录"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/" 下面文件为:
  -rw-r--r--  10 admin  supergroup     142465 2014-04-08 00:20  job.jar
  -rw-r--r--  10 admin  supergroup        334 2014-04-08 00:45     job.split
  -rw-r--r--   3 admin  supergroup         80 2014-04-08 00:50       job.splitmetainfo
  -rw-r--r--   3 admin supergroup  20416 2014-04-08 00:55 job.xml
  job.jar 为运行的Jar包,   job.split内容 为(FileSplit 对象), job.splitmetainfo 内容 为(SplitMetaInfo 对象),job.xml 为job的配置文件
   
  提交作业:
status = jobSubmitClient.submitJob(
                jobId, submitJobDir.toString(), jobCopy.getCredentials());
   
  RPC请求:JobSubmissionProtocol.submitJob(job_201404010621_0004, hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004, org.apache.hadoop.security.Credentials@70677770)
  返回: JobStatus: setProgress=0,mapProgress=0,reduceProgress=0,cleanProgress=0,runstate=4,priority=NOMAL,..
   
  RPC请求:JobSubmissionProtocol.getJobProfile(job_201404010621_0004)
  返回:JobProfile:jobFile=hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,jobID= job_201404010621_0004,name= WordCount,queue= default,url= http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004,user= Admin
   
  综合JobStatus 和JobProfile
Job: job_201404010621_0004
file: hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml
tracking URL: http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004
map() completion: 0.0
reduce() completion: 0.0
   
  监控Job状态:
   jobClient.monitorAndPrintJob(conf, info);
   
  RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)
  返回:   setProgress=1,mapProgress=1,reduceProgress=0.22222224,cleanProgress=1,runstate=1,priority=NOMAL
   
  RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)
  返回:   setProgress=1,mapProgress=1,reduceProgress=1,cleanProgress=1,runstate=2,priority=NOMAL,
   
  即map 100% reduce 100%
  之后会多次发送JobSubmissionProtocol.getJobStatus(job_201404010621_0004)请求
   
  RPC请求:JobSubmissionProtocol.getTaskCompletionEvents(job_201404010621_0004, 0, 10)
返回:
 
 
RPC请求:JobSubmissionProtocol.getJobCounters(job_201404010621_0004)
返回:OW
       Job Counters
              Launched reduce tasks=2
              SLOTS_MILLIS_MAPS=293879
              Total time spent by all reduces waiting after reserving slots (ms)=0
              Total time spent by all maps waiting after reserving slots (ms)=0
              Launched map tasks=4
              Data-local map tasks=4
              SLOTS_MILLIS_REDUCES=74342
       File Output Format Counters
              Bytes Written=933
       FileSystemCounters
              FILE_BYTES_READ=316152
              HDFS_BYTES_READ=201008521
              FILE_BYTES_WRITTEN=370366
              HDFS_BYTES_WRITTEN=933
       File Input Format Counters
              Bytes Read=201008194
       Map-Reduce Framework
              Map output materialized bytes=2574
              Map input records=15600000
              Reduce shuffle bytes=2574
              Spilled Records=23025
              Map output bytes=356000000
              Total committed heap usage (bytes)=378023936
              CPU time spent (ms)=158350
              Combine input records=41011850
              SPLIT_RAW_BYTES=327
              Reduce input records=225
              Reduce input groups=75
              Combine output records=12075
              Physical memory (bytes) snapshot=650371072
              Reduce output records=75
              Virtual memory (bytes) snapshot=5300277248
              Map output records=41000000]
页: [1]
查看完整版本: Hadoop源码分析-MapReduce的Job提交过程