13916729435 发表于 2016-12-12 10:04:21

Hadoop中的RPC机制分析(基于Java NIO机制)

前言:

这些天一直奔波于长沙和武汉之间,忙着腾讯的笔试、面试,以至于对hadoop RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。可以参考:http://baike.baidu.com/view/32726.htm)机制分析的博客一直耽搁了下来。昨天晚上胡老大和我抱怨说:最近乱的很。呵呵,老是往武汉跑,能不乱嘛。不过差不多腾讯面试的事就该告一段落了。五一期间,云计算小组的成员们,我们再搞起来吧。记住,我们还有一本hadoop的手册没出来呢。胡老大已经答应给我们写提纲了,在这期间,我们还是先把内功再修炼修炼吧。

分析对象:
hadoop版本:hadoop 0.20.203.0

必备技术点:
1. 动态代理(参考:http://blog.csdn.net/jiangwei0910410003/article/details/21153787)
2. Java NIO(参考:http://blog.csdn.net/jiangwei0910410003/article/details/21153613)
3. Java网络编程

目录:
一.RPC协议
二.ipc.RPC源码分析
三.ipc.Client源码分析
四.ipc.Server源码分析

分析:

一.RPC协议

在分析协议之前,我觉得我们很有必要先搞清楚协议是什么。下面我就谈一点自己的认识吧。如果你学过java的网络编程,你一定知道:当客户端发送一个字节给服务端时,服务端必须也要有一个读字节的方法在阻塞等待;反之亦然。 这种我把它称为底层的通信协议。可是对于一个大型的网络通信系统来说,很显然这种说法的协议粒度太小,不方便我们理解整个网络通信的流程及架构,所以我造了个说法:架构层次的协议。通俗一点说,就是我把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了,从这个角度来说,架构层次协议的说法就可以成立了(注:如果从架构层次的协议来分析系统,我们就先不要太在意方法的具体实现,呵呵,我相信你懂得~)。

Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套作为协议的接口。如图:



下面就几个重点的协议介绍一下吧:


VersionedProtocol:它是所有RPC协议接口的父接口,其中只有一个方法:getProtocolVersion()

(1)HDFS相关
ClientDatanodeProtocol:一个客户端和datanode之间的协议接口,用于数据块恢复
ClientProtocol:client与Namenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;
DatanodeProtocol: Datanode与Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol:SecondaryNode与Namenode交互的接口。

(2)Mapreduce相关
InterDatanodeProtocol:Datanode内部交互的接口,用来更新block的元数据;
InnerTrackerProtocol:TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol相似;
JobSubmissionProtocol:JobClient与JobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;
TaskUmbilicalProtocol:Task中子进程与母进程交互的接口,子进程即map、reduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。

一下子罗列了这么多的协议,有些人可能要问了,hadoop是怎么使用它们的呢?呵呵,不要着急哦,其实本篇博客所分析的是hadoop的RPC机制底层的具体实现,而这些协议却是应用层上的东西,比如hadoop是怎么样保持“心跳”的啊。所以在我的下一篇博客:源码级分析hadoop的心跳机制中会详细说明以上协议是怎样被使用的。尽请期待哦~。现在就开始我们的RPC源码之旅吧···

二.ipc.RPC源码分析

ipc.RPC类中有一些内部类,为了大家对RPC类有个初步的印象,就先罗列几个我们感兴趣的分析一下吧:

Invocation:用于封装方法名和参数,作为数据传输层,相当于VO吧。
ClientCache:用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker:是动态代理中的调用实现类,继承了InvocationHandler.
Server:是ipc.Server的实现类。

从以上的分析可以知道,Invocation类仅作为VO,ClientCache类只是作为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。现在就只剩下Invoker类了。如果你对动态代理(参考:http://weixiaolu.iyunv.com/blog/1477774)比较了解的话,你一下就会想到,我们接下来去研究的就是RPC.Invoker类中的invoke()方法了。代码如下:

代码一:




[*]publicObjectinvoke(Objectproxy,Methodmethod,Object[]args)
[*]throwsThrowable{
[*]···
[*]ObjectWritablevalue=(ObjectWritable)
[*]client.call(newInvocation(method,args),remoteId);
[*]···
[*]returnvalue.get();
[*]}

呵呵,如果你发现这个invoke()方法实现的有些奇怪的话,那你就对了。一般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 这句代码。而上面代码中却没有,这是为什么呢?其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络通信。而网络通信就是下面的这段代码实现的:

代码二:




[*]ObjectWritablevalue=(ObjectWritable)
[*]client.call(newInvocation(method,args),remoteId);


Invocation类在这里封装了方法名和参数,充当VO。其实这里网络通信只是调用了Client类的call()方法。那我们接下来分析一下ipc.Client源码吧。不过在分析ipc.Client源码之前,为了不让我们像盲目的苍蝇一样乱撞,我想先确定一下我们分析的目的是什么,我总结出了三点需要解决的问题:

1. 客户端和服务端的连接是怎样建立的?
2. 客户端是怎样给服务端发送数据的?
3. 客户端是怎样获取服务端的返回数据的?

基于以上三个问题,我们开始吧!!!

三.ipc.Client源码分析

同样,为了对Client类有个初步的了解,我们也先罗列几个我们感兴趣的内部类:

Call:用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
Connection:用以处理远程连接对象。继承了Thread
ConnectionId:唯一确定一个连接

问题1:客户端和服务端的连接是怎样建立的?

下面我们来看看Client类中的cal()方法吧:

代码三:




[*]publicWritablecall(Writableparam,ConnectionIdremoteId)
[*]throwsInterruptedException,IOException{
[*]Callcall=newCall(param);//将传入的数据封装成call对象
[*]Connectionconnection=getConnection(remoteId,call);//获得一个连接
[*]connection.sendParam(call);//向服务端发送call对象
[*]booleaninterrupted=false;
[*]synchronized(call){
[*]while(!call.done){
[*]try{
[*]call.wait();//等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
[*]}catch(InterruptedExceptionie){
[*]//因中断异常而终止,设置标志interrupted为true
[*]interrupted=true;
[*]}
[*]}
[*]if(interrupted){
[*]Thread.currentThread().interrupt();
[*]}
[*]
[*]if(call.error!=null){
[*]if(call.errorinstanceofRemoteException){
[*]call.error.fillInStackTrace();
[*]throwcall.error;
[*]}else{//本地异常
[*]throwwrapException(remoteId.getAddress(),call.error);
[*]}
[*]}else{
[*]returncall.value;//返回结果数据
[*]}
[*]}
[*]}


具体代码的作用我已做了注释,所以这里不再赘述。但到目前为止,你依然不知道RPC机制底层的网络连接是怎么建立的。呵呵,那我们只好再去深究了,分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:

代码四:




[*]Connectionconnection=getConnection(remoteId,call);//获得一个连接
[*]connection.sendParam(call);//向服务端发送call对象


先看看是怎么获得一个到服务端的连接吧,下面贴出ipc.Client类中的getConnection()方法。

代码五:




[*]privateConnectiongetConnection(ConnectionIdremoteId,
[*]Callcall)
[*]throwsIOException,InterruptedException{
[*]if(!running.get()){
[*]//如果client关闭了
[*]thrownewIOException("Theclientisstopped");
[*]}
[*]Connectionconnection;
[*]//如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。
[*]//但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。
[*]do{
[*]synchronized(connections){
[*]connection=connections.get(remoteId);
[*]if(connection==null){
[*]connection=newConnection(remoteId);
[*]connections.put(remoteId,connection);
[*]}
[*]}
[*]}while(!connection.addCall(call));//将call对象放入对应连接中的calls池,就不贴出源码了
[*]//这句代码才是真正的完成了和服务端建立连接哦~
[*]connection.setupIOstreams();
[*]returnconnection;
[*]}

如果你还有兴趣继续分析下去,那我们就一探建立连接的过程吧,下面贴出Client.Connection类中的setupIOstreams()方法:

代码六:




[*]privatesynchronizedvoidsetupIOstreams()throwsInterruptedException{
[*]···
[*]try{
[*]···
[*]while(true){
[*]setupConnection();//建立连接
[*]InputStreaminStream=NetUtils.getInputStream(socket);//获得输入流
[*]OutputStreamoutStream=NetUtils.getOutputStream(socket);//获得输出流
[*]writeRpcHeader(outStream);
[*]···
[*]this.in=newDataInputStream(newBufferedInputStream
[*](newPingInputStream(inStream)));//将输入流装饰成DataInputStream
[*]this.out=newDataOutputStream
[*](newBufferedOutputStream(outStream));//将输出流装饰成DataOutputStream
[*]writeHeader();
[*]//跟新活动时间
[*]touch();
[*]//当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
[*]start();
[*]return;
[*]}
[*]}catch(IOExceptione){
[*]markClosed(e);
[*]close();
[*]}
[*]}

再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出Client.Connection类中的setupConnection()方法:

代码七:




[*]privatesynchronizedvoidsetupConnection()throwsIOException{
[*]shortioFailures=0;
[*]shorttimeoutFailures=0;
[*]while(true){
[*]try{
[*]this.socket=socketFactory.createSocket();//终于看到创建socket的方法了
[*]this.socket.setTcpNoDelay(tcpNoDelay);
[*]···
[*]//设置连接超时为20s
[*]NetUtils.connect(this.socket,remoteId.getAddress(),20000);
[*]this.socket.setSoTimeout(pingInterval);
[*]return;
[*]}catch(SocketTimeoutExceptiontoe){
[*]/*设置最多连接重试为45次。
[*]*总共有20s*45=15分钟的重试时间。
[*]*/
[*]handleConnectionFailure(timeoutFailures++,45,toe);
[*]}catch(IOExceptionie){
[*]handleConnectionFailure(ioFailures++,maxRetries,ie);
[*]}
[*]}
[*]}

终于,我们知道了客户端的连接是怎样建立的了,其实就是创建一个普通的socket进行通信。呵呵,那服务端是不是也是创建一个ServerSocket进行通信的呢?呵呵,先不要急,到这里我们只解决了客户端的第一个问题,下面还有两个问题没有解决呢,我们一个一个地来解决吧。

问题2:客户端是怎样给服务端发送数据的?

我们回顾一下代码四吧。第一句为了完成连接的建立,我们已经分析完毕;而第二句是为了发送数据,呵呵,分析下去,看能不能解决我们的问题呢。下面贴出Client.Connection类的sendParam()方法吧:

代码八:




[*]publicvoidsendParam(Callcall){
[*]if(shouldCloseConnection.get()){
[*]return;
[*]}
[*]DataOutputBufferd=null;
[*]try{
[*]synchronized(this.out){
[*]if(LOG.isDebugEnabled())
[*]LOG.debug(getName()+"sending#"+call.id);
[*]//创建一个缓冲区
[*]d=newDataOutputBuffer();
[*]d.writeInt(call.id);
[*]call.param.write(d);
[*]byte[]data=d.getData();
[*]intdataLength=d.getLength();
[*]out.writeInt(dataLength);//首先写出数据的长度
[*]out.write(data,0,dataLength);//向服务端写数据
[*]out.flush();
[*]}
[*]}catch(IOExceptione){
[*]markClosed(e);
[*]}finally{
[*]IOUtils.closeStream(d);
[*]}
[*]}

其实这就是java io的socket发送数据的一般过程哦,没有什么特别之处。到这里问题二也解决了,来看看问题三吧。

问题3:客户端是怎样获取服务端的返回数据的?

我们再回顾一下代码六吧。代码六中,当连接建立时会启动一个线程用于处理服务端返回的数据,我们看看这个处理线程是怎么实现的吧,下面贴出Client.Connection类和Client.Call类中的相关方法吧:

代码九:




[*]方法一:
[*]publicvoidrun(){
[*]···
[*]while(waitForWork()){
[*]receiveResponse();//具体的处理方法
[*]}
[*]close();
[*]···
[*]}
[*]
[*]方法二:
[*]privatevoidreceiveResponse(){
[*]if(shouldCloseConnection.get()){
[*]return;
[*]}
[*]touch();
[*]try{
[*]intid=in.readInt();//阻塞读取id
[*]if(LOG.isDebugEnabled())
[*]LOG.debug(getName()+"gotvalue#"+id);
[*]Callcall=calls.get(id);//在calls池中找到发送时的那个对象
[*]intstate=in.readInt();//阻塞读取call对象的状态
[*]if(state==Status.SUCCESS.state){
[*]Writablevalue=ReflectionUtils.newInstance(valueClass,conf);
[*]value.readFields(in);//读取数据
[*]//将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三
[*]call.setValue(value);
[*]calls.remove(id);//删除已处理的call
[*]}elseif(state==Status.ERROR.state){
[*]···
[*]}elseif(state==Status.FATAL.state){
[*]···
[*]}
[*]}catch(IOExceptione){
[*]markClosed(e);
[*]}
[*]}
[*]
[*]方法三:
[*]publicsynchronizedvoidsetValue(Writablevalue){
[*]this.value=value;
[*]callComplete();//具体实现
[*]}
[*]protectedsynchronizedvoidcallComplete(){
[*]this.done=true;
[*]notify();//唤醒client等待线程
[*]}

代码九完成的功能主要是:启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面我们来分析Server端的源码吧。

四.ipc.Server源码分析

同样,为了让大家对ipc.Server有个初步的了解,我们先分析一下它的几个内部类吧:

Call:用于存储客户端发来的请求
Listener: 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder:响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection:连接类,真正的客户端请求读取逻辑在这个类中。
Handler:请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

如果你看过ipc.Server的源码,你会发现其实ipc.Server是一个abstract修饰的抽象类。那随之而来的问题就是:hadoop是怎样初始化RPC的Server端的呢?这个问题着实也让我想了好长时间。不过后来我想到Namenode初始化时一定初始化了RPC的Sever端,那我们去看看Namenode的初始化源码吧:

1. 初始化Server

代码十:




[*]privatevoidinitialize(Configurationconf)throwsIOException{
[*]···
[*]//创建rpcserver
[*]InetSocketAddressdnSocketAddr=getServiceRpcServerAddress(conf);
[*]if(dnSocketAddr!=null){
[*]intserviceHandlerCount=
[*]conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
[*]DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
[*]//获得serviceRpcServer
[*]this.serviceRpcServer=RPC.getServer(this,dnSocketAddr.getHostName(),
[*]dnSocketAddr.getPort(),serviceHandlerCount,
[*]false,conf,namesystem.getDelegationTokenSecretManager());
[*]this.serviceRPCAddress=this.serviceRpcServer.getListenerAddress();
[*]setRpcServiceServerAddress(conf);
[*]}
[*]//获得server
[*]this.server=RPC.getServer(this,socAddr.getHostName(),
[*]socAddr.getPort(),handlerCount,false,conf,namesystem
[*].getDelegationTokenSecretManager());
[*]
[*]···
[*]this.server.start();//启动RPCserverClients只允许连接该server
[*]if(serviceRpcServer!=null){
[*]serviceRpcServer.start();//启动RPCserviceRpcServer为HDFS服务的server
[*]}
[*]startTrashEmptier(conf);
[*]}

查看Namenode初始化源码得知:RPC的server对象是通过ipc.RPC类的getServer()方法获得的。下面咱们去看看ipc.RPC类中的getServer()源码吧:

代码十一:




[*]publicstaticServergetServer(finalObjectinstance,finalStringbindAddress,finalintport,
[*]finalintnumHandlers,
[*]finalbooleanverbose,Configurationconf,
[*]SecretManager<?extendsTokenIdentifier>secretManager)
[*]throwsIOException{
[*]returnnewServer(instance,conf,bindAddress,port,numHandlers,verbose,secretManager);
[*]}


这时我们发现getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象。哈哈,现在你明白了我前面说的“RPC.Server是ipc.Server的实现类”了吧。不过RPC.Server的构造函数还是调用了ipc.Server类的构造函数的,因篇幅所限,就不贴出相关源码了。

2. 运行Server
如代码十所示,初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码吧:

代码十二:




[*]/**启动服务*/
[*]publicsynchronizedvoidstart(){
[*]responder.start();//启动responder
[*]listener.start();//启动listener
[*]handlers=newHandler;
[*]
[*]for(inti=0;i<handlerCount;i++){
[*]handlers=newHandler(i);
[*]handlers.start();//逐个启动Handler
[*]}
[*]}

3. Server处理请求

1)建立连接
分析过ipc.Client源码后,我们知道Client端的底层通信直接采用了阻塞式IO编程,当时我们曾做出猜测:Server端是不是也采用了阻塞式IO。现在我们仔细地分析一下吧,如果Server端也采用阻塞式IO,当连接进来的Client端很多时,势必会影响Server端的性能。hadoop的实现者们考虑到了这点,所以他们采用了java NIO来实现Server端,java NIO可参考博客:http://weixiaolu.iyunv.com/blog/1479656。那Server端采用javaNIO是怎么建立连接的呢?分析源码得知,Server端采用Listener监听客户端的连接,下面先分析一下Listener的构造函数吧:

代码十三:




[*]publicListener()throwsIOException{
[*]address=newInetSocketAddress(bindAddress,port);
[*]//创建ServerSocketChannel,并设置成非阻塞式
[*]acceptChannel=ServerSocketChannel.open();
[*]acceptChannel.configureBlocking(false);
[*]
[*]//将serversocket绑定到本地端口
[*]bind(acceptChannel.socket(),address,backlogLength);
[*]port=acceptChannel.socket().getLocalPort();
[*]//获得一个selector
[*]selector=Selector.open();
[*]readers=newReader;
[*]readPool=Executors.newFixedThreadPool(readThreads);
[*]//启动多个reader线程,为了防止请求多时服务端响应延时的问题
[*]for(inti=0;i<readThreads;i++){
[*]SelectorreadSelector=Selector.open();
[*]Readerreader=newReader(readSelector);
[*]readers=reader;
[*]readPool.execute(reader);
[*]}
[*]//注册连接事件
[*]acceptChannel.register(selector,SelectionKey.OP_ACCEPT);
[*]this.setName("IPCServerlisteneron"+port);
[*]this.setDaemon(true);
[*]}


在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法:

代码十四:




[*]publicvoidrun(){
[*]···
[*]while(running){
[*]SelectionKeykey=null;
[*]try{
[*]selector.select();
[*]Iterator<SelectionKey>iter=selector.selectedKeys().iterator();
[*]while(iter.hasNext()){
[*]key=iter.next();
[*]iter.remove();
[*]try{
[*]if(key.isValid()){
[*]if(key.isAcceptable())
[*]doAccept(key);//具体的连接方法
[*]}
[*]}catch(IOExceptione){
[*]}
[*]key=null;
[*]}
[*]}catch(OutOfMemoryErrore){
[*]···
[*]}

下面贴出Server.Listener类中doAccept ()方法中的关键源码吧:

代码十五:




[*]voiddoAccept(SelectionKeykey)throwsIOException,OutOfMemoryError{
[*]Connectionc=null;
[*]ServerSocketChannelserver=(ServerSocketChannel)key.channel();
[*]SocketChannelchannel;
[*]while((channel=server.accept())!=null){//建立连接
[*]channel.configureBlocking(false);
[*]channel.socket().setTcpNoDelay(tcpNoDelay);
[*]Readerreader=getReader();//从readers池中获得一个reader
[*]try{
[*]reader.startAdd();//激活readSelector,设置adding为true
[*]SelectionKeyreadKey=reader.registerChannel(channel);//将读事件设置成兴趣事件
[*]c=newConnection(readKey,channel,System.currentTimeMillis());//创建一个连接对象
[*]readKey.attach(c);//将connection对象注入readKey
[*]synchronized(connectionList){
[*]connectionList.add(numConnections,c);
[*]numConnections++;
[*]}
[*]···
[*]}finally{
[*]//设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
[*]//用了wait()方法等待。因篇幅有限,就不贴出源码了。
[*]reader.finishAdd();
[*]}
[*]}
[*]}

当reader被唤醒,reader接着执行doRead()方法。

2)接收请求
下面贴出Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码:

代码十六:




[*]方法一:
[*]voiddoRead(SelectionKeykey)throwsInterruptedException{
[*]intcount=0;
[*]Connectionc=(Connection)key.attachment();//获得connection对象
[*]if(c==null){
[*]return;
[*]}
[*]c.setLastContact(System.currentTimeMillis());
[*]try{
[*]count=c.readAndProcess();//接受并处理请求
[*]}catch(InterruptedExceptionieo){
[*]···
[*]}
[*]···
[*]}
[*]
[*]方法二:
[*]publicintreadAndProcess()throwsIOException,InterruptedException{
[*]while(true){
[*]···
[*]if(!rpcHeaderRead){
[*]if(rpcHeaderBuffer==null){
[*]rpcHeaderBuffer=ByteBuffer.allocate(2);
[*]}
[*]//读取请求头
[*]count=channelRead(channel,rpcHeaderBuffer);
[*]if(count<0||rpcHeaderBuffer.remaining()>0){
[*]returncount;
[*]}
[*]//读取请求版本号
[*]intversion=rpcHeaderBuffer.get(0);
[*]byte[]method=newbyte[]{rpcHeaderBuffer.get(1)};
[*]···
[*]
[*]data=ByteBuffer.allocate(dataLength);
[*]}
[*]//读取请求
[*]count=channelRead(channel,data);
[*]
[*]if(data.remaining()==0){
[*]···
[*]if(useSasl){
[*]···
[*]}else{
[*]processOneRpc(data.array());//处理请求
[*]}
[*]···
[*]}
[*]}
[*]returncount;
[*]}
[*]}


3)获得call对象
下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。

代码十七:




[*]方法一:
[*]privatevoidprocessOneRpc(byte[]buf)throwsIOException,
[*]InterruptedException{
[*]if(headerRead){
[*]processData(buf);
[*]}else{
[*]processHeader(buf);
[*]headerRead=true;
[*]if(!authorizeConnection()){
[*]thrownewAccessControlException("Connectionfrom"+this
[*]+"forprotocol"+header.getProtocol()
[*]+"isunauthorizedforuser"+user);
[*]}
[*]}
[*]}
[*]方法二:
[*]privatevoidprocessData(byte[]buf)throwsIOException,InterruptedException{
[*]DataInputStreamdis=
[*]newDataInputStream(newByteArrayInputStream(buf));
[*]intid=dis.readInt();//尝试读取id
[*]Writableparam=ReflectionUtils.newInstance(paramClass,conf);//读取参数
[*]param.readFields(dis);
[*]
[*]Callcall=newCall(id,param,this);//封装成call
[*]callQueue.put(call);//将call存入callQueue
[*]incRpcCount();//增加rpc请求的计数
[*]}

4)处理call对象
你还记得Server类中还有个Handler内部类吗?呵呵,对call对象的处理就是它干的。下面贴出Server.Handler类中run()方法中的关键代码:

代码十八:




[*]while(running){
[*]try{
[*]finalCallcall=callQueue.take();//弹出call,可能会阻塞
[*]···
[*]//调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中
[*]value=call(call.connection.protocol,call.param,call.timestamp);
[*]synchronized(call.connection.responseQueue){
[*]setupResponse(buf,call,
[*](error==null)?Status.SUCCESS:Status.ERROR,
[*]value,errorClass,error);
[*]···
[*]//给客户端响应请求
[*]responder.doRespond(call);
[*]}
[*]}




5)返回请求
下面贴出Server.Responder类中的doRespond()方法源码:

代码十九:




[*]方法一:
[*]voiddoRespond(Callcall)throwsIOException{
[*]synchronized(call.connection.responseQueue){
[*]call.connection.responseQueue.addLast(call);
[*]if(call.connection.responseQueue.size()==1){
[*]//返回响应结果,并激活writeSelector
[*]processResponse(call.connection.responseQueue,true);
[*]}
[*]}
[*]}

页: [1]
查看完整版本: Hadoop中的RPC机制分析(基于Java NIO机制)