设为首页 收藏本站
查看: 1004|回复: 0

[经验分享] Hadoop学习二十一:Hadoop-Hdfs DataNode 源码

[复制链接]

尚未签到

发表于 2016-12-8 11:12:37 | 显示全部楼层 |阅读模式
一. DataNode类图
DSC0000.jpg

二. DateNode属性说明


  •  DatanodeProtocol namenode:RPC代理类。RPC.getProxy(nameNodeAddress)得到远程NameNode代理类。后续的versionRequest(),register(),sendHeartbeat(),blockReceived(),blockReport()都是namenode的行为。
  • org.apache.hadoop.ipc.Server ipcServer:RPC Server。conf里配置ipcAddr=0.0.0.0:50020,ipcServer = RPC.getServer(this, ipcAddr.getHostName()...),此DataNode作为RPC Server提供远程服务,提供接口ClientDatanodeProtocol和InterDatanodeProtocol中定义的服务。
  • HttpServer infoServer:a Jetty embedded server to answer http requests。
  • DataStorage storage:参考http://zy19982004.iyunv.com/admin/blogs/1878758。
  • DatanodeRegistration dnRegistration:DatanodeRegistration class conatins all information the Namenode needs to identify and verify a Datanode when it contacts the Namenode.后续的sendHeartbeat(),blockReceived(),blockReport()方法第一个参数就是DatanodeRegistration 。
  • FSDatasetInterface data:FSDataset的接口,管理着DataNode的所有block。参考http://zy19982004.iyunv.com/admin/blogs/1880303。
  • DataBlockScanner blockScanner:定时对数据块文件进行校验。
  • DataXceiverServer dataXceiverServer:参考http://zy19982004.iyunv.com/admin/blogs/1881117。
  • LinkedList<Block> receivedBlockList:成功创建的新block。
  • LinkedList<String> delHints:可以删掉该数据块的节点。DataXceiver.replaceBlock()和writeBlock成功后会调用DataNode.notifyNamenodeReceivedBlock()方法给这两个List add value.
    protected void notifyNamenodeReceivedBlock(Block block, String delHint) {
    synchronized (receivedBlockList) {
    synchronized (delHints) {
    receivedBlockList.add(block);
    delHints.add(delHint);
    receivedBlockList.notifyAll();
    }
    }
    }
     

三. 代码顺序阅读


  •  main()函数入口。
      public static void main(String args[]) {
    secureMain(args, null);
    }
  • secureMain():初始化DataNode;加入线程里。
    public static void secureMain(String [] args, SecureResources resources) {
    DataNode datanode = createDataNode(args, null, resources);
    if (datanode != null)
    datanode.join();
    }
  • createDataNode():初始化DataNode;启动该线程。
      public static DataNode createDataNode(String args[],
    Configuration conf, SecureResources resources) throws IOException {
    DataNode dn = instantiateDataNode(args, conf, resources);
    runDatanodeDaemon(dn);
    return dn;
    }
  • instantiateDataNode():读取dataDirs;调用makeInstance()。makeInstance():check dataDirs;new DateNode。new DataNode():调用startDataNode()。
    public static DataNode instantiateDataNode(String args[],
    Configuration conf,
    SecureResources resources) throws IOException {
    String[] dataDirs = conf.getStrings(DATA_DIR_KEY);
    return makeInstance(dataDirs, conf, resources);
    }
    public static DataNode makeInstance(String[] dataDirs, Configuration conf,
    SecureResources resources) throws IOException {
    DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);
    return new DataNode(conf, dirs, resources);
    }
    DataNode(final Configuration conf,
    final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
    startDataNode(conf, dataDirs, resources);
    }
  • startDataNode():1.根据conf对象初始化各个ip port;2.初始化DataStorage;3.初始化DatanodeRegistration;4.初始化DatanodeProtocol;5.handshake() get version and id info from the name-node,实际调用namenode.versionRequest();6.DataStorage.recoverTransitionRead(...)检查文件系统的状态并做恢复;7.初始化FSDatasetInterface;8.初始化DataBlockScanner;9.创建serverSocket,据此初始化DataXceiverServer;10.HttpServer,配置参数然后启动;11.初始化ipcServer;
  • 接3,runDatanodeDaemon():注册DataNode;启动DataNode线程。
      public static void runDatanodeDaemon(DataNode dn) throws IOException {
    if (dn != null) {
    dn.register();
    dn.dataNodeThread = new Thread(dn, dnThreadName);
    dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
    dn.dataNodeThread.start();
    }
    }
    private void register() throws IOException {
    dnRegistration = namenode.register(dnRegistration);

  • 进入run():启动DataXceiverServer线程;启动ipcServer;startDistributedUpgradeIfNeeded() Start distributed upgrade if it should be initiated by the data-node;offerService()开始工作。
      public void run() {
    LOG.info(dnRegistration + "In DataNode.run, data = " + data);
    dataXceiverServer.start();
    ipcServer.start();
    while (shouldRun) {
    startDistributedUpgradeIfNeeded();
    offerService();
    }
  • offerService():1.定期heartBeatInterval调用namenode.sendHeartbeat()发送DataNode信息的心跳,并执行带回的DatanodeCommand;2.一直调用namenode.blockReceived()向Namenode报告最近接收到的数据块、要删除的多余块副本;3.定期调用namenode.blockReport()发送block report 告诉NameNode此DataNode上的block信息,并执行带回的DatanodeCommand。4.启动blockScannerThread线程。
    public void offerService() throws Exception {

    while (shouldRun) {
    try {
    long startTime = now();
    if (startTime - lastHeartbeat > heartBeatInterval) {
    lastHeartbeat = startTime;
    1.DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
    data.getCapacity(),
    data.getDfsUsed(),
    data.getRemaining(),
    xmitsInProgress.get(),
    getXceiverCount());
    processCommand(cmds);
    }
    2.namenode.blockReceived(dnRegistration, blockArray, delHintArray);
    blockReceived成功后删除receivedBlockListhe delHints里的数据
    if (startTime - lastBlockReport > blockReportInterval) {
    3.DatanodeCommand cmd = namenode.blockReport(dnRegistration,
    BlockListAsLongs.convertToArrayLongs(bReport));
    processCommand(cmd);
    }
    blockScannerThread = new Daemon(blockScanner);
    blockScannerThread.start();
    } // while (shouldRun)
    } // offerService


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311453-1-1.html 上篇帖子: MAC下hadoop开发环境搭建系列(二) 下篇帖子: HADOOP的学习笔记 (第三期) eclipse 配置hadoop开发环境
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表