设为首页 收藏本站
查看: 884|回复: 0

[经验分享] 第六章:小朱笔记hadoop之源码分析-ipc分析 第四节:RPC类分析

[复制链接]

尚未签到

发表于 2016-12-13 10:05:24 | 显示全部楼层 |阅读模式
第六章:小朱笔记hadoop之源码分析-ipc分析

第四节:RPC类分析

RPC类是对Server、Client的具体化。在RPC类中规定,客户程序发出请求调用时,参数类型必须是Invocation;从服务器返回的值类型必须是ObjectWritable。
RPC类是对Server、Client的包装,简化用户的使用。如果一个类需充当服务器,只需通过RPC类的静态方法getServer获得Server实例,然后start。同时此类提供协议接口的实现。如果一个类充当客户端,可以通过getProxy或者waitForProxy获得一个实现了协议接口的proxy object,与服务器端交互。

RPC类中有5个静态内部类,分别为:

写道

Invocation :用于封装方法名和参数,作为数据传输层,相当于VO吧。
ClientCache :用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server : org.apache.hadoop.ipc.Server的具体实现类,实现了抽象类的call方法,获得传入参数的call实例,再获取method方法,反射调用即可。
VersionMismatch:版本不匹配异常。

 
(1)Invocation
 

    /** A method invocation, including the method name and its parameters. */
private static class Invocation implements Writable, Configurable {
private String methodName; // 方法名
private Class[] parameterClasses; // 参数类型集合
private Object[] parameters; // 参数值
private Configuration conf; // 配置类实例
......
}
(2)ClientCache
 

    /* Cache a client using its socket factory as the hash key */
static private class ClientCache {
// 该内部类定义了一个缓存Map
private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
* 通过客户端org.apache.hadoop.ipc.Client的SocketFactory可以快速取出对应的Client实例
*
* @param conf
*            Configuration
* @return an IPC client
*/
/**
* 从缓存Map中取出一个IPC Client实例,如果缓存够中不存在,就创建一个兵加入到缓存Map中
*/
private synchronized Client getClient(Configuration conf, SocketFactory factory) {
// Construct & cache client. The configuration is only used for
// timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout
// for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
Client client = clients.get(factory);
if (client == null) {
client = new Client(ObjectWritable.class, conf, factory); // 通过反射实例化一个ObjectWritable对象,构造Client实例
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
/**
* Construct & cache an IPC client with the default SocketFactory if no
* cached client exists.
*
* @param conf
*            Configuration
* @return an IPC client
*/
private synchronized Client getClient(Configuration conf) {
return getClient(conf, SocketFactory.getDefault());
}
/**
* Stop a RPC client connection A RPC client is closed only when its
* reference count becomes zero.
*/
private void stopClient(Client client) {
synchronized (this) {
client.decCount(); // 该client实例的引用计数减1
if (client.isZeroReference()) { // 如果client实例的引用计数此时为0
clients.remove(client.getSocketFactory()); // 从缓存中删除
}
}
if (client.isZeroReference()) { // 如果client实例引用计数为0,需要关闭
client.stop(); // 停止所有与该client实例相关的线程
}
}
}
(3)Invoker
 

private static class Invoker implements InvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
public Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
// 构造一个RPC.Invocation实例作为参数传递给调用程序,执行调用,返回值为value
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
synchronized private void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
}
 
(4)Server

public Writable call(Class<?> protocol, Writable param, long receivedTime)throws IOException {
Invocation call = (Invocation)param;
// 通过反射,根据调用方法名和方法参数类型得到Method实例  
Method method =protocol.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);// 设置反射的对象在使用时取消Java语言访问检查,提高效率  
Object value = method.invoke(instance, call.getParameters());// 执行调用(instance是调用底层方法的对象,第二个参数是方法调用的参数)     
}
 
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-313641-1-1.html 上篇帖子: 第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode分析-format过程分析 下篇帖子: Indexing and Searching on a Hadoop Distributed File System (如何在HDFS文件上建索引)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表