Hadoop RPC学习笔记
首先RpcEngine。这个接口中了Server端的getServer和Call方法。Client端的getProxy和stopProxy的方法。
从Server端看起
RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException;
这个接口这个参数的函数可以从一个调用的例子来看
this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
这是Namenode初始化时的创建RPC Server实例的调用。实际调用的实现,是每个Protocol对应的RpcEngine的实例的实现。RpcEngine的实例怎么来的呢?是根据配置的类名(如rpc.engine.NamenodeProtocols)反射得到的。默认RpcEngine是WritableRpcEngine(hadoop还提供了一种AvroRpcEngine,跳过没看)。WritableRpcEngine的getServer没有特别,一直追溯到顶层的Server,即ipc包中的抽象Server类。到这个类中,就可以窥视到所有Server的重要元素(静态内部类),如Call,Listener,Responder,Connection,Handler。不表先,回到Server的构造函数中来。
前面无非是设置一些Server属性,地址端口,各种队列长度,read线程数等等(paramClass这个单独说)。重点是这里。
listener = new Listener();
this.port = listener.getAddress().getPort();
this.rpcMetrics = RpcMetrics.create(this);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
Listener是做了实际事情的,bind端口,启动读线程等。其中有一些Java网络编程的知识点,比如ServerSocketChannel,Selector。
然后是一个Responder的实例。
responder = new Responder();
一个Server实例就这样初始化完成了。在各种Server端的Init中就会调用他start以启动服务。start分别start responder,listener,handler。分别看。
Listener的Run循环select,对于每个select到的key,DoAccept。DoAccept主要逻辑代码如下:
Reader reader = getReader();//轮流唤醒reader线程
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);//将reader的selector注册到channel中
c = new Connection(readKey, channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
reader线程唤醒后,根据attach再seletkey上得connection提供的readAndProcess方法,读取到数据后,根据server注册的paramClass类,生成方法和参数对象,根据这些对象实例化一个Call实例,放入Call队列中。
Call类不复杂,就是维护一个调用的属性。参数,连接对象,结果数据和处理时长相关的信息。
private int id; // the client's call id
private Writable param; // the parameter passed
private Connection connection; // connection to client
private long timestamp; // the time received when response is null
// the time served when response is not null
private ByteBuffer response; // the response for this call
自然会有一个线程再来消费队列中的Call,就是Handler,handler线程是在Server Start时启动的。handler取出每个Call对象后,传入Call对象中维护的方法名称和参数,调用当前server实例实现的call方法,将调用invoke。(其中有很多java 反射的东西)。handler得到call返回的结果后,填入Call对象。然后就是调用注册的Responder的doRespond方法了。
Responder在Server实例化时实例化,在Server Start时启动,是一个daemon线程。暴露的外部方法。
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}
这里就又涉及到一个生产者消费者队列,为了完成异步的Response。当该call是首次产生response,会调用processResponse,在这个方法中,会取出队列中的第一个call,将call的reponse写入call的connection中维护的channel中,并且会判断是否写完,若没有写完,就会将这个call重新加入队列,并会注册关注可写的selector给channel。在Response的主循环中,当有可写selectedKeys时,调用doAsyncWrite方法继续写,而doAsyncWrite还是会调用processResponse方法的。如此循环往复,直至写完。并且,主循环中会判断时长已超过15分钟的链接,主动closeconnection。
大致就是这样。Server的几个重要组成部分就是Listener,Handler,Responder,Connection和Call。主要涉及到的知识点有两部分:Java NIO和反射。
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com