DatanodeRegistration dnRegistration:DatanodeRegistration class conatins all information the Namenode needs to identify and verify a Datanode when it contacts the Namenode.后续的sendHeartbeat(),blockReceived(),blockReport()方法第一个参数就是DatanodeRegistration 。
startDataNode():1.根据conf对象初始化各个ip port;2.初始化DataStorage;3.初始化DatanodeRegistration;4.初始化DatanodeProtocol;5.handshake() get version and id info from the name-node,实际调用namenode.versionRequest();6.DataStorage.recoverTransitionRead(...)检查文件系统的状态并做恢复;7.初始化FSDatasetInterface;8.初始化DataBlockScanner;9.创建serverSocket,据此初始化DataXceiverServer;10.HttpServer,配置参数然后启动;11.初始化ipcServer;
接3,runDatanodeDaemon():注册DataNode;启动DataNode线程。
public static void runDatanodeDaemon(DataNode dn) throws IOException {
if (dn != null) {
dn.register();
dn.dataNodeThread = new Thread(dn, dnThreadName);
dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
dn.dataNodeThread.start();
}
}
private void register() throws IOException {
dnRegistration = namenode.register(dnRegistration);
}
进入run():启动DataXceiverServer线程;启动ipcServer;startDistributedUpgradeIfNeeded() Start distributed upgrade if it should be initiated by the data-node;offerService()开始工作。
public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);
dataXceiverServer.start();
ipcServer.start();
while (shouldRun) {
startDistributedUpgradeIfNeeded();
offerService();
}
offerService():1.定期heartBeatInterval调用namenode.sendHeartbeat()发送DataNode信息的心跳,并执行带回的DatanodeCommand;2.一直调用namenode.blockReceived()向Namenode报告最近接收到的数据块、要删除的多余块副本;3.定期调用namenode.blockReport()发送block report 告诉NameNode此DataNode上的block信息,并执行带回的DatanodeCommand。4.启动blockScannerThread线程。
public void offerService() throws Exception {