Hadoop学习二十一:Hadoop-Hdfs DataNode 源码
一. DataNode类图二. DateNode属性说明
[*] DatanodeProtocol namenode:RPC代理类。RPC.getProxy(nameNodeAddress)得到远程NameNode代理类。后续的versionRequest(),register(),sendHeartbeat(),blockReceived(),blockReport()都是namenode的行为。
[*]org.apache.hadoop.ipc.Server ipcServer:RPC Server。conf里配置ipcAddr=0.0.0.0:50020,ipcServer = RPC.getServer(this, ipcAddr.getHostName()...),此DataNode作为RPC Server提供远程服务,提供接口ClientDatanodeProtocol和InterDatanodeProtocol中定义的服务。
[*]HttpServer infoServer:a Jetty embedded server to answer http requests。
[*]DataStorage storage:参考http://zy19982004.iyunv.com/admin/blogs/1878758。
[*]DatanodeRegistration dnRegistration:DatanodeRegistration class conatins all information the Namenode needs to identify and verify a Datanode when it contacts the Namenode.后续的sendHeartbeat(),blockReceived(),blockReport()方法第一个参数就是DatanodeRegistration 。
[*]FSDatasetInterface data:FSDataset的接口,管理着DataNode的所有block。参考http://zy19982004.iyunv.com/admin/blogs/1880303。
[*]DataBlockScanner blockScanner:定时对数据块文件进行校验。
[*]DataXceiverServer dataXceiverServer:参考http://zy19982004.iyunv.com/admin/blogs/1881117。
[*]LinkedList<Block> receivedBlockList:成功创建的新block。
[*]LinkedList<String> delHints:可以删掉该数据块的节点。DataXceiver.replaceBlock()和writeBlock成功后会调用DataNode.notifyNamenodeReceivedBlock()方法给这两个List add value.
protected void notifyNamenodeReceivedBlock(Block block, String delHint) {
synchronized (receivedBlockList) {
synchronized (delHints) {
receivedBlockList.add(block);
delHints.add(delHint);
receivedBlockList.notifyAll();
}
}
}
三. 代码顺序阅读
[*] main()函数入口。
public static void main(String args[]) {
secureMain(args, null);
}
[*]secureMain():初始化DataNode;加入线程里。
public static void secureMain(String [] args, SecureResources resources) {
DataNode datanode = createDataNode(args, null, resources);
if (datanode != null)
datanode.join();
}
[*]createDataNode():初始化DataNode;启动该线程。
public static DataNode createDataNode(String args[],
Configuration conf, SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
runDatanodeDaemon(dn);
return dn;
}
[*]instantiateDataNode():读取dataDirs;调用makeInstance()。makeInstance():check dataDirs;new DateNode。new DataNode():调用startDataNode()。
public static DataNode instantiateDataNode(String args[],
Configuration conf,
SecureResources resources) throws IOException {
String[] dataDirs = conf.getStrings(DATA_DIR_KEY);
return makeInstance(dataDirs, conf, resources);
}
public static DataNode makeInstance(String[] dataDirs, Configuration conf,
SecureResources resources) throws IOException {
DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);
return new DataNode(conf, dirs, resources);
}
DataNode(final Configuration conf,
final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
startDataNode(conf, dataDirs, resources);
}
[*]startDataNode():1.根据conf对象初始化各个ip port;2.初始化DataStorage;3.初始化DatanodeRegistration;4.初始化DatanodeProtocol;5.handshake() get version and id info from the name-node,实际调用namenode.versionRequest();6.DataStorage.recoverTransitionRead(...)检查文件系统的状态并做恢复;7.初始化FSDatasetInterface;8.初始化DataBlockScanner;9.创建serverSocket,据此初始化DataXceiverServer;10.HttpServer,配置参数然后启动;11.初始化ipcServer;
[*]接3,runDatanodeDaemon():注册DataNode;启动DataNode线程。
public static void runDatanodeDaemon(DataNode dn) throws IOException {
if (dn != null) {
dn.register();
dn.dataNodeThread = new Thread(dn, dnThreadName);
dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
dn.dataNodeThread.start();
}
}
private void register() throws IOException {
dnRegistration = namenode.register(dnRegistration);
}
[*]进入run():启动DataXceiverServer线程;启动ipcServer;startDistributedUpgradeIfNeeded() Start distributed upgrade if it should be initiated by the data-node;offerService()开始工作。
public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);
dataXceiverServer.start();
ipcServer.start();
while (shouldRun) {
startDistributedUpgradeIfNeeded();
offerService();
}
[*]offerService():1.定期heartBeatInterval调用namenode.sendHeartbeat()发送DataNode信息的心跳,并执行带回的DatanodeCommand;2.一直调用namenode.blockReceived()向Namenode报告最近接收到的数据块、要删除的多余块副本;3.定期调用namenode.blockReport()发送block report 告诉NameNode此DataNode上的block信息,并执行带回的DatanodeCommand。4.启动blockScannerThread线程。
public void offerService() throws Exception {
while (shouldRun) {
try {
long startTime = now();
if (startTime - lastHeartbeat > heartBeatInterval) {
lastHeartbeat = startTime;
1.DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
xmitsInProgress.get(),
getXceiverCount());
processCommand(cmds);
}
2.namenode.blockReceived(dnRegistration, blockArray, delHintArray);
blockReceived成功后删除receivedBlockListhe delHints里的数据
if (startTime - lastBlockReport > blockReportInterval) {
3.DatanodeCommand cmd = namenode.blockReport(dnRegistration,
BlockListAsLongs.convertToArrayLongs(bReport));
processCommand(cmd);
}
blockScannerThread = new Daemon(blockScanner);
blockScannerThread.start();
} // while (shouldRun)
} // offerService
}
页:
[1]