设为首页 收藏本站
查看: 747|回复: 0

[经验分享] Hadoop-远程过程调用

[复制链接]

尚未签到

发表于 2016-12-9 06:29:09 | 显示全部楼层 |阅读模式
 
 
 
Hadoop IPC类图如下
DSC0000.jpg
 
 

 
连接

//为了提高通讯效率,连接是可以复用的,通过ConnectionId来区分不同的连接
class ConnectionId {
InetSocketAddress address;         //远端服务器的地址
UserGroupInformation ticket;       //用户和用户组的信息
Class<?> protocol;                     //IPC接口对应的类对象
}
//
ConnectionHeader类是客户端和服务端TCP连接建立之后交换的第一条消息,包括ConnectionId中的
//用户信息和IPC接口信息,用于确认用户是否有权利连接
ConnectionHeader

//服务端连接对象
public class Connection {
private boolean rpcHeaderRead = false; //是否已读如入了RPC版本号
private boolean headerRead = false;  //是否读入了连接消息头
private SocketChannel channel;
private ByteBuffer data;
private ByteBuffer dataLengthBuffer;
private LinkedList<Call> responseQueue;
private volatile int rpcCount = 0; //当前正在处理的RPC数量
private long lastContact;
private int dataLength;
private Socket socket;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
private String hostAddress;
private int remotePort;
private InetAddress addr;
ConnectionHeader header = new ConnectionHeader();
Class<?> protocol;
boolean useSasl;
SaslServer saslServer;
private AuthMethod authMethod;
private boolean saslContextEstablished;
private boolean skipInitialSaslHandshake;
private ByteBuffer rpcHeaderBuffer;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
UserGroupInformation user = null;
}
//客户端连接
private class Connection extends Thread {
private InetSocketAddress server;             //IPC服务端地址
private String serverPrincipal;  // server's krb5 principal name
private ConnectionHeader header;              //连接消息头
private final ConnectionId remoteId;                //IPC连接标识
private AuthMethod authMethod; // authentication method
private boolean useSasl;
private Token<? extends TokenIdentifier> token;
private SaslRpcClient saslRpcClient;
private Socket socket = null;                 // connected socket
private DataInputStream in;
private DataOutputStream out;
private int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private int maxRetries; //the max. no. of retries for socket connections
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private int pingInterval; // how often sends ping to the server in msecs
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
private IOException closeException; // close reason
}
   
  Call
  

//客户端
private class Call {
int id;                                       // call id
Writable param;                               // parameter
Writable value;                               // value, null if error
IOException error;                            // exception, null if value
boolean done;   
}
//服务端
private static class Call {
private int id;                               // the client's call id
private Writable param;                       // the parameter passed
private Connection connection;                // connection to client
private long timestamp;
}
//客户端和服务端通过各自的Call对象发送调用
客户端还有ParallelCall 用于同时发送多个远程IPC调用
   
  服务端处理
  

//处理监听事件的线程
class Listener extends Thread {
//创建SeverSocketChannel,并注册ACCEPT事件
public Listener() {
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers = reader;
readPool.execute(reader);
}
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
}
//处理ACCEPT事件
public void run() {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
}            
}
}
//Reader线程,用于处理读事件并交由Handler线程处理
class Reader implements Runnable {
public void run() {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
}              
}
}
//异步的处理写事件
class Responder extends Thread {
public void run() {
waitPending();     // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}            
}
synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call)key.attachment();
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}
}
}

void doAsyncWrite(SelectionKey key) {
synchronized(call.connection.responseQueue) {
processResponse(call.connection.responseQueue, false);
}
}

//inHandler用于表示是Handler中直接调用写操作
//还是Responer线程的异步写操作
void processResponse(LinkedList<Call> responseQueue,boolean inHandler) {
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);         
}

void doRead(SelectionKey key) {
Connection c = (Connection)key.attachment();
count = c.readAndProcess();
}
   
 
 
 
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311515-1-1.html 上篇帖子: Hadoop-balancer执行原理 下篇帖子: hadoop在加载了别的包后如何判断已经加载进来了
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表