Kafka 测试环境宕机原因查询(二)
上节遗留问题a) 关于 inLock(controllerContext.controllerLock) 这个代码, 因为 controllerLock 是 ReentrantLock 类型
的锁, 所以如果 处理 session失效的zookeeper线程成功抢占当前嗦的话,就能进入到 handleNewSession 内部方法, 并进一步
执行 onControllerResignation 里面的代码。 而正式因为可重入锁, 也可以执行 onControllerResignation 里面的代码
b) 里面有 deleteTopicManager.shutdown()-> deleteTopicsThread.shutdown() 方法。而此时,对应的 controllerLock
已经调用了 两次 lock, 第一次是 handleNewSession 中的 lock, 第二次是 onControllerResignation 的lock。
启动分析
a) 10.3.63.5/6/7 三台服务节点,对应的是 1, 2, 3。 三台服务于 2015-01-20 13:51 左右同时启动。
b) 5 上面的 controller.log 如下:
INFO : Initialized controller epoch to 21 and zk version 20 (kafka.controller.ControllerEpochListener)
INFO : Controller starting up (kafka.controller.KafkaController)
INFO : Controller startup complete (kafka.controller.KafkaController)
6 上面的 controller.log 如下:
INFO : Initialized controller epoch to 21 and zk version 20 (kafka.controller.ControllerEpochListener)
INFO : Controller starting up (kafka.controller.KafkaController)
INFO : Controller startup complete (kafka.controller.KafkaController)
7 上面的 controller.log 如下:
INFO : Initialized controller epoch to 20 and zk version 19 (kafka.controller.ControllerE
pochListener)
INFO : Controller starting up (kafka.controller.KafkaController)
INFO : Broker 3 starting become controller state transition (kafka.controller.KafkaController)
INFO : Controller 3 incremented epoch to 21 (kafka.controller.KafkaController)
INFO : Partitions undergoing preferred replica election: (kafka.controller.KafkaController)
INFO : Partitions that completed preferred replica election: (kafka.controller.KafkaController)
INFO : Resuming preferred replica election for partitions: (kafka.controller.KafkaController)
INFO : Partitions being reassigned: Map() (kafka.controller.KafkaController)
INFO : Partitions already reassigned: List() (kafka.controller.KafkaController)
INFO : Resuming reassignment of partitions: Map() (kafka.controller.KafkaController)
INFO : List of topics to be deleted: (kafka.controller.KafkaController)
INFO : List of topics ineligible for deletion: V1_A,V1_TEST,V1_B (kafka.controller.KafkaController)
INFO : Currently active brokers in the cluster: Set() (kafka.controller.KafkaController)
INFO : Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)
INFO : Current list of topics in the cluster: Set(V1_TEST, V1_B, V1_A) (kafka.controller.KafkaController)
INFO : Started replica state machine with initial state -> Map([Topic=V1_TEST,Pa
rtition=9,Replica=2] -> ReplicaDeletionIneligible, -> ReplicaDeletionIneligible, [Topic=V1_A,Partition=4,Repli
...
INFO : Started partition state machine with initial state -> Map( -
> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePartition, [V
1_TEST,2] -> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> Offlin
ePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -
> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePartition,
-> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePartition, -> OfflinePa
rtition, -> OfflinePartition, -> OfflinePartition, -> NewPartition, -> OfflinePartition) (kafka.contro
ller.PartitionStateMachine)
INFO : Broker 3 is ready to serve as the new controller with epoch 21 (kafka.controller.KafkaController)
INFO : Starting preferred replica leader election for partitions (kafka.controller.KafkaController)
INFO : Invoking state change to OnlinePartition for partitions (kafka.control
ler.PartitionStateMachine)
INFO : Controller startup complete (kafka.controller.KafkaController)
c) 通过查看zookeeper的broker/ids 的节点, 发现先出现 3, 然后是 2, 然后是1。而且根据上面的日志显示,只有 节点 3 真正成为了 controller。 原理是在 KafkaController
startup 的时候进行了选举, 如果谁能够第一个写入到zookeeper 的 /controler 节点成功,则成为真正意义上的 controller, 目前便于理解说是主controller。然后在 成功成为主
controller后会调用 KafkaController 的 onControllerFailover 方法。这个回调是在 ZookeeperLeaderElector .elect 方法中的 onBecomingLeader()
d) 这样的话,就不是所有的controller 都启动了 deleteTopicManager.start(), 只有主的controller 启动了。而只有主的controller 才执行了 onControllerFailover ->
initializeControllerContext() ->initializeTopicDeletion() -> deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion,
topicsIneligibleForDeletion)
e) 而其他的controller 是没有启动 deleteTopicManager 的, 从上面的代码也可以看到, deleteTopicManager 也是没有初始化的,即还是 null 。
f) 根据上面的得出,就是在zookeeper的session失效时, 进行回调。先回调 KafkaController 的 handleNewSession, 然后回调 KafkaHealthcheck 的 handleNewSession
方法。 根据zookeeper 的源码得出,执行这个回调的是另外一个线程。所以会先输出 ZK expired; shut down all controller components and try to re-elect 代码。然后
是执行 inLock(controllerContext.controllerLock), 而 controllerLock 是 ReentrantLock。 所以,如果当前有其他线程先拿到这个锁,就导致内部的代码无法执行。这也
就能解释为什么有的执行内部代码了, 有的没有执行。
g) 上面我们假设执行了内部代码, 所以会往下执行一直到 onControllerResignation() -> deleteTopicManager.shutdown() -> deleteTopicsThread.shutdown()。实际
执行的代码如下:
def shutdown(): Unit = {
info("Shutting down")
isRunning.set(false)
if (isInterruptible)
interrupt()
shutdownLatch.await()
info("Shutdown completed")
}
如果这个失效所在的节点是 主 controller 的话, 这个线程是启动了的,也就是会调用 start方法, 代码如下。 如果不是主节点, 是不会调用 start 方法。 这样在 shutdownLatch.await()
处一直等待。
override def run(): Unit = {
info("Starting ")
try{
while(isRunning.get()){
doWork()
}
} catch{
case e: Throwable =>
if(isRunning.get())
error("Error due to ", e)
}
shutdownLatch.countDown()
info("Stopped ")
}
h) 这里面有一个疑问是, 如果不是主controller, deleteTopicManager 和 deleteTopicsThread 都不会被初始化的,也就是 null 。 执行代码会抛出空指针异常的, 目前
却没有遇到。 暂时还不能解决这个问题。。。。。
I0Itec zkclient 源码地址
http://www.boyunjian.com/javasrc/com.github.sgroschupf/zkclient/0.1/_/org/I0Itec/zkclient/util/ZkEventThread.java#
ReentrantLock 分析:
http://blog.csdn.net/aesop_wubo/article/details/7574379
server.log 分析
a) 5 上面的 server.log 如下:
INFO Verifying properties (kafka.utils.VerifiableProperties)
INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)
INFO Property host.name is overridden to 10.3.63.5 (kafka.utils.VerifiableProperties)
INFO Property log.cleaner.enable is overridden to true (kafka.utils.VerifiableProperties)
INFO Property log.dirs is overridden to /data/kafka-log (kafka.utils.VerifiableProperties)
INFO Property log.retention.bytes is overridden to 5368709120 (kafka.utils.VerifiableProperties)
INFO Property log.retention.check.interval.ms is overridden to 600000 (kafka.utils.VerifiableProperties)
INFO Property log.retention.hours is overridden to 720 (kafka.utils.VerifiableProperties)
INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
INFO Property num.network.threads is overridden to 500 (kafka.utils.VerifiableProperties)
INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connect is overridden to 10.3.63.8:2181,10.3.63.9:2181 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connection.timeout.ms is overridden to 500000 (kafka.utils.VerifiableProperties)
INFO , starting (kafka.server.KafkaServer)
INFO , Connecting to zookeeper on 10.3.63.8:2181,10.3.63.9:2181 (kafka.server.KafkaServer)
b) 6 上面的server.log 如下:
INFO Verifying properties (kafka.utils.VerifiableProperties)
INFO Property broker.id is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property host.name is overridden to 10.3.63.6 (kafka.utils.VerifiableProperties)
INFO Property log.cleaner.enable is overridden to true (kafka.utils.VerifiableProperties)
INFO Property log.dirs is overridden to /data/kafka-log (kafka.utils.VerifiableProperties)
INFO Property log.retention.bytes is overridden to 5368709120 (kafka.utils.VerifiableProperties)
INFO Property log.retention.check.interval.ms is overridden to 600000 (kafka.utils.VerifiableProperties)
INFO Property log.retention.hours is overridden to 720 (kafka.utils.VerifiableProperties)
INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
INFO Property num.network.threads is overridden to 500 (kafka.utils.VerifiableProperties)
INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connect is overridden to 10.3.63.8:2181,10.3.63.9:2181 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connection.timeout.ms is overridden to 500000 (kafka.utils.VerifiableProperties)
INFO , starting (kafka.server.KafkaServer)
INFO , Connecting to zookeeper on 10.3.63.8:2181,10.3.63.9:2181 (kafka.server.KafkaServer)
c) 7 上面的 server.log 如下:
INFO Verifying properties (kafka.utils.VerifiableProperties)
INFO Property broker.id is overridden to 3 (kafka.utils.VerifiableProperties)
INFO Property host.name is overridden to 10.3.63.7 (kafka.utils.VerifiableProperties)
INFO Property log.cleaner.enable is overridden to true (kafka.utils.VerifiableProperties)
INFO Property log.dirs is overridden to /data/kafka-log (kafka.utils.VerifiableProperties)
INFO Property log.retention.bytes is overridden to 5368709120 (kafka.utils.VerifiableProperties)
INFO Property log.retention.check.interval.ms is overridden to 600000 (kafka.utils.VerifiableProperties)
INFO Property log.retention.hours is overridden to 720 (kafka.utils.VerifiableProperties)
INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
INFO Property num.network.threads is overridden to 500 (kafka.utils.VerifiableProperties)
INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connect is overridden to 10.3.63.8:2181,10.3.63.9:2181 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connection.timeout.ms is overridden to 500000 (kafka.utils.VerifiableProperties)
INFO , starting (kafka.server.KafkaServer)
INFO , Connecting to zookeeper on 10.3.63.8:2181,10.3.63.9:2181 (kafka.server.KafkaServer)
d) 目前看到的日志结果是一样的。
上述分析问题
a) 按照上面的分析,如果是主controller 的话,则不可能出现 死掉的现象。 而现实的情况是 7 也确实出现了 以前出现的现象。 下面是 7 上的 controller 日志
INFO , ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
INFO , Shutting down (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
上节遗留问题再分析
a) 由上面的分析我们知道, 上个节点只有一个成功写入 /controler 节点的才能成为真正的 controller,
这个真正的controller会启动一个 DeleteTopicsThread 的线程, 等待用户的删除话题或者其他的删除
话题的请求,并处理这些请求。
b) 等待是通过 doWork() -> awaitTopicDeletionNotification() -> deleteTopicsCond.await()
来挂起的。通过 await 方法, 可以使其他线程能够获取到对应的锁
c) 而 deleteTopicsCond = controllerContext.controllerLock.newCondition() 是原来 controller
中的 ReentrantLock 而得到的。参考下面的链接能知道, 正是因为 DeleteTopicsThread 的 deleteTopicsCond.await()
方法,才能够使 KafkaController 的handNession 能够获取到对应的锁。
d) 此时的 handNewssion 会导致 DeleteTopicsThread 的 shutdown 操作。而如果此时没有 其他线程占用 ReentrantLock
锁的话, deleteTopicsCond.await() 方法, 会因为 interrupt() 方法而打断。 但是, 这个shutdown 操作,是 zookeepeer
的线程 在 handNewssion 操作,在操作之前已经获得了 ReentrantLock 的锁, 而此时, DeleteTopicsThread 需要重新或得
这个锁, 就会一直等待。 await 代码参考如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 将当前线程包装下后,
// 添加到Condition自己维护的一个链表中。
int savedState = fullyRelease(node);// 释放当前线程占有的锁,从demo中看到,
// 调用await前,当前线程是占有锁的
int interruptMode = 0;
while (!isOnSyncQueue(node)) {// 释放完毕后,遍历AQS的队列,看当前节点是否在队列中,
// 不在 说明它还没有竞争锁的资格,所以继续将自己沉睡。
// 直到它被加入到队列中,聪明的你可能猜到了,
// 没有错,在singal的时候加入不就可以了?
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
我们知道AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行。直到队列为空。
而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:
线程1调用reentrantLock.lock时,线程被加入到AQS的等待队列中。
线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。
接着马上被加入到Condition的等待队列中,以为着该线程需要signal信号。
线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的等待队列中。
线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。 注意,这个时候,线程1 并没有被唤醒。
signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。
直到释放所整个过程执行完毕。
关于 ReentrantLock
http://tenyears.iteye.com/blog/48750
关于 ReentrantLock.newCondition()
http://www.importnew.com/9281.html
http://tiangu.iteye.com/blog/1486949
http://www.iyunv.com/kf/201401/274069.html
http://blog.csdn.net/chen77716/article/details/6641477
http://agapple.iteye.com/blog/966490
http://whitesock.iteye.com/blog/1337539
http://www.molotang.com/articles/480.html
http://coderbee.net/index.php/concurrent/20131209/614
http://ifeve.com/introduce-abstractqueuedsynchronizer/
apache-jira 对应的问题
https://issues.apache.org/jira/browse/KAFKA-1903
https://issues.apache.org/jira/browse/KAFKA-1663
页:
[1]