设为首页 收藏本站
查看: 558|回复: 0

[经验分享] Hadoop学习十五:Hadoop-Hdfs DataXceiverServer源码概述

[复制链接]

尚未签到

发表于 2016-12-8 09:48:19 | 显示全部楼层 |阅读模式
一.DataXceiverServer类图
DSC0000.jpg

二.DataXceiverServer


  •  Server used for receiving/sending a block of data.This is created to listen for requests from clients or other DataNodes.  This small server does not use the Hadoop IPC mechanism.DataXceiverServer用于接收和发送block数据,它监听着client或者其它DataNode的请求。DataXceiverServer没有采用RPC机制。DataXceiverServer是流式机制,而RPC是命令式接口。
  • DataXceiverServer每接收到一个请求,就会创建一个DataXceiver来处理该请求。
    class DataXceiverServer implements Runnable, FSConstants {
    public static final Log LOG = DataNode.LOG;
    ServerSocket ss;
    DataNode datanode;
    // Record all sockets opend for data transfer
    Map<Socket, Socket> childSockets = Collections.synchronizedMap(new HashMap<Socket, Socket>());
    static final int MAX_XCEIVER_COUNT = 256;//默认值是256,每个node最多可以起多少个DataXceiver,如果太多的话可能会导致内存不足
    BlockBalanceThrottler balanceThrottler;
    DataXceiverServer(ServerSocket ss, Configuration conf, DataNode datanode) {
    this.ss = ss;
    this.datanode = datanode;
    }
    public void run() {
    while (datanode.shouldRun) {
    Socket s = ss.accept();//接受client请求
    s.setTcpNoDelay(true);
    //实例化DataXceiver,需要Socket和DataNode
    new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)).start();
    }
    }
    }

三.DataXceiver类图


  •  DataXceiver处理Client或DataNode的五种请求(DataTransferProtocol接口定义)。
      public static final byte OP_WRITE_BLOCK = (byte) 80;
    public static final byte OP_READ_BLOCK = (byte) 81;
    /**
    * @deprecated As of version 15, OP_READ_METADATA is no longer supported
    */
    @Deprecated public static final byte OP_READ_METADATA = (byte) 82;
    public static final byte OP_REPLACE_BLOCK = (byte) 83;
    public static final byte OP_COPY_BLOCK = (byte) 84;
    public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
     
    class DataXceiver implements Runnable, FSConstants {
    Socket s;
    final String remoteAddress; // address of remote side
    final String localAddress; // local address of this daemon
    DataNode datanode;
    DataXceiverServer dataXceiverServer;
    public DataXceiver(Socket s, DataNode datanode,
    DataXceiverServer dataXceiverServer) {
    this.s = s;
    this.datanode = datanode;
    this.dataXceiverServer = dataXceiverServer;
    dataXceiverServer.childSockets.put(s, s);
    remoteAddress = s.getRemoteSocketAddress().toString();
    localAddress = s.getLocalSocketAddress().toString();
    }
    public void run() {
    DataInputStream in = null;
    in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE));
    short version = in.readShort();if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
            throw new IOException( "Version Mismatch" );
          }//version
    boolean local = s.getInetAddress().equals(s.getLocalAddress());//本地请求
    byte op = in.readByte();//op
    switch (op) {
    //读数据块
    case DataTransferProtocol.OP_READ_BLOCK:
    readBlock(in);
    datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
    if (local)
    datanode.myMetrics.incrReadsFromLocalClient();
    else
    datanode.myMetrics.incrReadsFromRemoteClient();
    break;
    //写数据块
    case DataTransferProtocol.OP_WRITE_BLOCK:
    writeBlock(in);
    datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
    if (local)
    datanode.myMetrics.incrWritesFromLocalClient();
    else
    datanode.myMetrics.incrWritesFromRemoteClient();
    break;
    //替换数据块
    case DataTransferProtocol.OP_REPLACE_BLOCK:
    replaceBlock(in);
    datanode.myMetrics
    .addReplaceBlockOp(DataNode.now() - startTime);
    break;
    //拷贝数据块
    case DataTransferProtocol.OP_COPY_BLOCK:
    // for balancing purpose; send to a proxy source
    copyBlock(in);
    datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
    break;
    //读取数据块校验码
    case DataTransferProtocol.OP_BLOCK_CHECKSUM: // get the checksum of
    // a block
    getBlockChecksum(in);
    datanode.myMetrics.addBlockChecksumOp(DataNode.now()
    - startTime);
    break;
    default:
    throw new IOException("Unknown opcode " + op
    + " in data stream");
    }
    dataXceiverServer.childSockets.remove(s);
    }
    }
     

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311330-1-1.html 上篇帖子: 在windows下配置hadoop-eclipse插件,并连接虚拟机的hadoop集群 下篇帖子: hadoop 关闭进程时报错no 进程 to stop
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表