/**
* Start the socket server
*/
def startup() {
val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
for(i <- 0 until numProcessorThreads) {
processors(i) = new Processor(i,
time,
maxRequestSize,
aggregateIdleMeter,
newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
numProcessorThreads,
requestChannel,
quotas,
connectionsMaxIdleMs)
//processor负责接受网络请求和响应,请求read:放入RequestChannel里的queue中,响应write
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}
newGauge("ResponsesBeingSent", new Gauge[Int] {
def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
})
// register the processor threads for notification of responses
// 将processor的wakeup作为方法,加入到requestchannel的listener,对应processor的id
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections
//接受网络accept请求,将nio的connection放入processor的connnectlist中,让processor处理之后的请求相应
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
acceptor.awaitStartup
info("Started")
}
获得请求,放入queue中,等待handler处理
/*
* Process reads from ready sockets
*/
def read(key: SelectionKey) {
lruConnections.put(key, currentTimeNanos)
val socketChannel = channelFor(key)
var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
receive = new BoundedByteBufferReceive(maxRequestSize)
key.attach(receive)
}
val read = receive.readFrom(socketChannel)
val address = socketChannel.socket.getRemoteSocketAddress();
trace(read + " bytes read from " + address)
if(read < 0) {
close(key)
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
requestChannel.sendRequest(req)//放入queue中,等待handler处理
key.attach(null)
// explicitly reset interest ops to not READ, no need to wake up the selector just yet
key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
} else {
// more reading to be done
trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
wakeup()
}
}
响应请求
/*
* Process writes to ready sockets
*/
def write(key: SelectionKey) {
val socketChannel = channelFor(key)
val response = key.attachment().asInstanceOf[RequestChannel.Response]
val responseSend = response.responseSend
if(responseSend == null)
throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = responseSend.writeTo(socketChannel)//将response发送相应给client
trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
if(responseSend.complete) {
response.request.updateRequestMetrics()
key.attach(null)
trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
} else {
trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_WRITE)
wakeup()
}
}