设为首页 收藏本站
查看: 563|回复: 0

[经验分享] Hadoop学习笔记(一)——RPC

[复制链接]

尚未签到

发表于 2016-12-8 09:53:40 | 显示全部楼层 |阅读模式

Hadoop RPC学习笔记



首先RpcEngine。这个接口中了Server端的getServer和Call方法。Client端的getProxy和stopProxy的方法。


从Server端看起






RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException;




这个接口这个参数的函数可以从一个调用的例子来看







this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);





这是Namenode初始化时的创建RPC Server实例的调用。实际调用的实现,是每个Protocol对应的RpcEngine的实例的实现。RpcEngine的实例怎么来的呢?是根据配置的类名(如rpc.engine.NamenodeProtocols)反射得到的。默认RpcEngine是WritableRpcEngine(hadoop还提供了一种AvroRpcEngine,跳过没看)。WritableRpcEngine的getServer没有特别,一直追溯到顶层的Server,即ipc包中的抽象Server类。到这个类中,就可以窥视到所有Server的重要元素(静态内部类),如Call,Listener,Responder,Connection,Handler。不表先,回到Server的构造函数中来。

前面无非是设置一些Server属性,地址端口,各种队列长度,read线程数等等(paramClass这个单独说)。重点是这里。



    listener = new Listener();
this.port = listener.getAddress().getPort();   
this.rpcMetrics = RpcMetrics.create(this);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);






Listener是做了实际事情的,bind端口,启动读线程等。其中有一些Java网络编程的知识点,比如ServerSocketChannel,Selector。



然后是一个Responder的实例。



    responder = new Responder();





一个Server实例就这样初始化完成了。在各种Server端的Init中就会调用他start以启动服务。start分别start responder,listener,handler。分别看。

Listener的Run循环select,对于每个select到的key,DoAccept。DoAccept主要逻辑代码如下:



Reader reader = getReader();//轮流唤醒reader线程
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);//将reader的selector注册到channel中
c = new Connection(readKey, channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}













reader线程唤醒后,根据attach再seletkey上得connection提供的readAndProcess方法,读取到数据后,根据server注册的paramClass类,生成方法和参数对象,根据这些对象实例化一个Call实例,放入Call队列中。

Call类不复杂,就是维护一个调用的属性。参数,连接对象,结果数据和处理时长相关的信息。





private int id;                               // the client's call id
private Writable param;                       // the parameter passed
private Connection connection;                // connection to client
private long timestamp;     // the time received when response is null
// the time served when response is not null
private ByteBuffer response;                      // the response for this call

  
  








自然会有一个线程再来消费队列中的Call,就是Handler,handler线程是在Server Start时启动的。handler取出每个Call对象后,传入Call对象中维护的方法名称和参数,调用当前server实例实现的call方法,将调用invoke。(其中有很多java 反射的东西)。handler得到call返回的结果后,填入Call对象。然后就是调用注册的Responder的doRespond方法了。




Responder在Server实例化时实例化,在Server Start时启动,是一个daemon线程。暴露的外部方法。





void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}





这里就又涉及到一个生产者消费者队列,为了完成异步的Response。当该call是首次产生response,会调用processResponse,在这个方法中,会取出队列中的第一个call,将call的reponse写入call的connection中维护的channel中,并且会判断是否写完,若没有写完,就会将这个call重新加入队列,并会注册关注可写的selector给channel。在Response的主循环中,当有可写selectedKeys时,调用doAsyncWrite方法继续写,而doAsyncWrite还是会调用processResponse方法的。如此循环往复,直至写完。并且,主循环中会判断时长已超过15分钟的链接,主动closeconnection。

  










大致就是这样。Server的几个重要组成部分就是Listener,Handler,Responder,Connection和Call。主要涉及到的知识点有两部分:Java NIO和反射。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311337-1-1.html 上篇帖子: Hadoop Hive sql语法详解 下篇帖子: Chapter 1. Meet Hadoop
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表