fenghzy 发表于 2018-11-1 09:07:17

Hadoop源代码分析之DatanodeProtocol(sendHeartbeat方法的调用)

  1、心跳机制
  

  
心跳的机制大概是这样的:
  
1) master启动的时候,会开一个ipc server在那里。
  
2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。
  

  2、找到心跳的代码
  

  
拿namenode和datanode来说,在datanode的offerService方法中,每隔3秒向namenode发送心跳
    3、心跳的底层细节一:datanode怎么获得namenode对象的?
  

  
首先,DataNode类中,有一个namenode的成员变量:
  Java代码


[*]public class DataNode extends Configured
[*]    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
[*]...
[*]public DatanodeProtocol namenode = null;
[*]...
[*]}

  


  
public>  

  
...
  

  
public DatanodeProtocol namenode = null;
  

    4、心跳的底层细节二:看看Invoker类
  

  
Invoker类是org.apache.hadoop.ipc.RPC类的一个静态内部类:
  Java代码


[*]private static class Invoker implements InvocationHandler {

  


  
   private static>  

  在这个类中,看invoke方法
  5、心跳的底层细节三:Invocation类
  

  
Invocation类是org.apache.hadoop.ipc.RPC类的一个静态内部类
  

  
没有什么业务逻辑方法,主要作用就是一个VO
  6、心跳的底层细节四:client类的call方法
  

  
接下来重点看client类的call方法:
  Java代码


[*]public Writable call(Writable param, InetSocketAddress addr,
[*]                     Class protocol, UserGroupInformation ticket)
[*]                     throws InterruptedException, IOException {
[*]
[*]Call call = new Call(param);
[*]// 将Invocation转化为Call
[*]Connection connection = getConnection(addr, protocol, ticket, call);
[*]// 连接远程服务器
[*]connection.sendParam(call);               // send the parameter
[*]// 将“序列化”后的call发给过去
[*]boolean interrupted = false;
[*]synchronized (call) {
[*]    while (!call.done) {
[*]      try {
[*]      call.wait();                           // wait for the result
[*]// 等待调用结果
[*]      } catch (InterruptedException ie) {
[*]      // save the fact that we were interrupted
[*]      interrupted = true;
[*]      }
[*]    }
[*]
[*]    if (interrupted) {
[*]      // set the interrupt flag now that we are done waiting
[*]      Thread.currentThread().interrupt();
[*]    }
[*]
[*]    if (call.error != null) {
[*]      if (call.error instanceof RemoteException) {
[*]      call.error.fillInStackTrace();
[*]      throw call.error;
[*]      } else { // local exception
[*]      throw wrapException(addr, call.error);
[*]      }
[*]    } else {
[*]      return call.value;
[*]// 返回
[*]    }
[*]}
[*]}
  

  
public Writable call(Writable param, InetSocketAddress addr,
  

  
                     Class protocol, UserGroupInformation ticket)
  

  
                     throws InterruptedException, IOException {
  

  

  

  
    Call call = new Call(param);
  

  
   // 将Invocation转化为Call
  

  
    Connection connection = getConnection(addr, protocol, ticket, call);
  

  
   // 连接远程服务器
  

  
    connection.sendParam(call);               // send the parameter
  

  
   // 将“序列化”后的call发给过去
  

  
    boolean interrupted = false;
  

  
    synchronized (call) {
  

  
      while (!call.done) {
  

  
      try {
  

  
          call.wait();                           // wait for the result
  

  
   // 等待调用结果
  

  
      } catch (InterruptedException ie) {
  

  
          // save the fact that we were interrupted
  

  
          interrupted = true;
  

  
      }
  

  
      }
  

  

  

  
      if (interrupted) {
  

  
      // set the interrupt flag now that we are done waiting
  

  
      Thread.currentThread().interrupt();
  

  
      }
  

  

  

  
      if (call.error != null) {
  

  
      if (call.error instanceof RemoteException) {
  

  
          call.error.fillInStackTrace();
  

  
          throw call.error;
  

  
      } else { // local exception
  

  
          throw wrapException(addr, call.error);
  

  
      }
  

  
      } else {
  

  
      return call.value;
  

  
   // 返回
  

  
      }
  

  
    }
  

  
}
  

  7、现在,一目了然了
  Java代码


[*]datanode向namenode发送heartbeat过程是这样的:
[*]
[*]    a) 在datanode初始化获得namenode的proxy
[*]    b) 在datanode上,调用namenode proxy的heartbeat方法:
[*]      namenode.sendHeartbeat(dnRegistration,
[*]                                                       data.getCapacity(),
[*]                                                       data.getDfsUsed(),
[*]                                                       data.getRemaining(),
[*]                                                       xmitsInProgress.get(),
[*]                                                       getXceiverCount());
[*]    c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法
[*]    d) client call方法将Invocation转化为Call对象
[*]    e) client 将call发送到真正的namenode服务器
[*]    f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来!
[*]    g) datanode接收结果,并将结果转化为DatanodeCommand[]
[*]


页: [1]
查看完整版本: Hadoop源代码分析之DatanodeProtocol(sendHeartbeat方法的调用)