Server used for receiving/sending a block of data.This is created to listen for requests from clients or other DataNodes. This small server does not use the Hadoop IPC mechanism.DataXceiverServer用于接收和发送block数据,它监听着client或者其它DataNode的请求。DataXceiverServer没有采用RPC机制。DataXceiverServer是流式机制,而RPC是命令式接口。
DataXceiverServer每接收到一个请求,就会创建一个DataXceiver来处理该请求。
class DataXceiverServer implements Runnable, FSConstants {
public static final Log LOG = DataNode.LOG;
ServerSocket ss;
DataNode datanode;
// Record all sockets opend for data transfer
Map<Socket, Socket> childSockets = Collections.synchronizedMap(new HashMap<Socket, Socket>());
static final int MAX_XCEIVER_COUNT = 256;//默认值是256,每个node最多可以起多少个DataXceiver,如果太多的话可能会导致内存不足
BlockBalanceThrottler balanceThrottler;
DataXceiverServer(ServerSocket ss, Configuration conf, DataNode datanode) {
this.ss = ss;
this.datanode = datanode;
}
public void run() {
while (datanode.shouldRun) {
Socket s = ss.accept();//接受client请求
s.setTcpNoDelay(true);
//实例化DataXceiver,需要Socket和DataNode
new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)).start();
}
}
}
三.DataXceiver类图
DataXceiver处理Client或DataNode的五种请求(DataTransferProtocol接口定义)。
public static final byte OP_WRITE_BLOCK = (byte) 80;
public static final byte OP_READ_BLOCK = (byte) 81;
/**
* @deprecated As of version 15, OP_READ_METADATA is no longer supported
*/
@Deprecated public static final byte OP_READ_METADATA = (byte) 82;
public static final byte OP_REPLACE_BLOCK = (byte) 83;
public static final byte OP_COPY_BLOCK = (byte) 84;
public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
class DataXceiver implements Runnable, FSConstants {
Socket s;
final String remoteAddress; // address of remote side
final String localAddress; // local address of this daemon
DataNode datanode;
DataXceiverServer dataXceiverServer;
public DataXceiver(Socket s, DataNode datanode,
DataXceiverServer dataXceiverServer) {
this.s = s;
this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer;
dataXceiverServer.childSockets.put(s, s);
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
}
public void run() {
DataInputStream in = null;
in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE));
short version = in.readShort();if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
throw new IOException( "Version Mismatch" );
}//version
boolean local = s.getInetAddress().equals(s.getLocalAddress());//本地请求
byte op = in.readByte();//op
switch (op) {
//读数据块
case DataTransferProtocol.OP_READ_BLOCK:
readBlock(in);
datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
if (local)
datanode.myMetrics.incrReadsFromLocalClient();
else
datanode.myMetrics.incrReadsFromRemoteClient();
break;
//写数据块
case DataTransferProtocol.OP_WRITE_BLOCK:
writeBlock(in);
datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
if (local)
datanode.myMetrics.incrWritesFromLocalClient();
else
datanode.myMetrics.incrWritesFromRemoteClient();
break;
//替换数据块
case DataTransferProtocol.OP_REPLACE_BLOCK:
replaceBlock(in);
datanode.myMetrics
.addReplaceBlockOp(DataNode.now() - startTime);
break;
//拷贝数据块
case DataTransferProtocol.OP_COPY_BLOCK:
// for balancing purpose; send to a proxy source
copyBlock(in);
datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
break;
//读取数据块校验码
case DataTransferProtocol.OP_BLOCK_CHECKSUM: // get the checksum of
// a block
getBlockChecksum(in);
datanode.myMetrics.addBlockChecksumOp(DataNode.now()
- startTime);
break;
default:
throw new IOException("Unknown opcode " + op
+ " in data stream");
}
dataXceiverServer.childSockets.remove(s);
}
}