Hadoop源代码分析之DatanodeProtocol(sendHeartbeat方法的调用)
1、心跳机制心跳的机制大概是这样的:
1) master启动的时候,会开一个ipc server在那里。
2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。
2、找到心跳的代码
拿namenode和datanode来说,在datanode的offerService方法中,每隔3秒向namenode发送心跳
3、心跳的底层细节一:datanode怎么获得namenode对象的?
首先,DataNode类中,有一个namenode的成员变量:
Java代码
[*]public class DataNode extends Configured
[*] implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
[*]...
[*]public DatanodeProtocol namenode = null;
[*]...
[*]}
public>
...
public DatanodeProtocol namenode = null;
4、心跳的底层细节二:看看Invoker类
Invoker类是org.apache.hadoop.ipc.RPC类的一个静态内部类:
Java代码
[*]private static class Invoker implements InvocationHandler {
private static>
在这个类中,看invoke方法
5、心跳的底层细节三:Invocation类
Invocation类是org.apache.hadoop.ipc.RPC类的一个静态内部类
没有什么业务逻辑方法,主要作用就是一个VO
6、心跳的底层细节四:client类的call方法
接下来重点看client类的call方法:
Java代码
[*]public Writable call(Writable param, InetSocketAddress addr,
[*] Class protocol, UserGroupInformation ticket)
[*] throws InterruptedException, IOException {
[*]
[*]Call call = new Call(param);
[*]// 将Invocation转化为Call
[*]Connection connection = getConnection(addr, protocol, ticket, call);
[*]// 连接远程服务器
[*]connection.sendParam(call); // send the parameter
[*]// 将“序列化”后的call发给过去
[*]boolean interrupted = false;
[*]synchronized (call) {
[*] while (!call.done) {
[*] try {
[*] call.wait(); // wait for the result
[*]// 等待调用结果
[*] } catch (InterruptedException ie) {
[*] // save the fact that we were interrupted
[*] interrupted = true;
[*] }
[*] }
[*]
[*] if (interrupted) {
[*] // set the interrupt flag now that we are done waiting
[*] Thread.currentThread().interrupt();
[*] }
[*]
[*] if (call.error != null) {
[*] if (call.error instanceof RemoteException) {
[*] call.error.fillInStackTrace();
[*] throw call.error;
[*] } else { // local exception
[*] throw wrapException(addr, call.error);
[*] }
[*] } else {
[*] return call.value;
[*]// 返回
[*] }
[*]}
[*]}
public Writable call(Writable param, InetSocketAddress addr,
Class protocol, UserGroupInformation ticket)
throws InterruptedException, IOException {
Call call = new Call(param);
// 将Invocation转化为Call
Connection connection = getConnection(addr, protocol, ticket, call);
// 连接远程服务器
connection.sendParam(call); // send the parameter
// 将“序列化”后的call发给过去
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
// 等待调用结果
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
throw wrapException(addr, call.error);
}
} else {
return call.value;
// 返回
}
}
}
7、现在,一目了然了
Java代码
[*]datanode向namenode发送heartbeat过程是这样的:
[*]
[*] a) 在datanode初始化获得namenode的proxy
[*] b) 在datanode上,调用namenode proxy的heartbeat方法:
[*] namenode.sendHeartbeat(dnRegistration,
[*] data.getCapacity(),
[*] data.getDfsUsed(),
[*] data.getRemaining(),
[*] xmitsInProgress.get(),
[*] getXceiverCount());
[*] c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法
[*] d) client call方法将Invocation转化为Call对象
[*] e) client 将call发送到真正的namenode服务器
[*] f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来!
[*] g) datanode接收结果,并将结果转化为DatanodeCommand[]
[*]
页:
[1]