设为首页 收藏本站
查看: 1072|回复: 0

[经验分享] Kafka 启动过程

[复制链接]

尚未签到

发表于 2017-5-23 16:56:42 | 显示全部楼层 |阅读模式
  1, 每个broker启动的时候都会去注册一个临时节点 /controller, 那个broker先注册这个节点,那个就是所有broker的leader,并将自己的信息写入到这个临时节点里面。如下:
  [zk: 10.3.63.204:2181,10.3.63.205:2181(CONNECTED) 3] get /controller
{"version":1,"brokerid":0,"timestamp":"1407310302044"}
cZxid = 0x700000592
ctime = Wed Aug 06 15:32:01 CST 2014
mZxid = 0x700000592
mtime = Wed Aug 06 15:32:01 CST 2014
pZxid = 0x700000592
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x147aa389edd0001
dataLength = 54
numChildren = 0
  每个broker都会起动kafkaController这个进程, 但只有一个是leader,controller主要是负责删除一些多余的
  topic或者其他选举某个topic的pation的leader使用。
  2,当关闭的时候,回调用KafkaServer的shutdown方法, 里面会先尝试关闭controller,具体调用代码如下:
  CoreUtils.swallow(controlledShutdown())。
代码的逻辑是从zookeeper的controller读出leader的id,并从broker/ids/id读出broker的信息, 然后发送一个
  ControlledShutdownRequest的请求到它上面,直到读到成功返回后才说明shutdownSuccessed
  3,  具体处理这个请求的逻辑世在KafkaApis中来处理的, 具体的代码如下:

  def handleControlledShutdownRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
ErrorMapping.NoError, partitionsRemaining)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
}
  里面可以看到,发送到主的leader上面,调用KafkaController的 def shutdownBroker(id: Int),工作的具体内容是循环topic的partittion, 然后判断当前的分区是否是主的, // If the broker leads the topic partition, transition the leader and update isr. Updates zk and // notifies all affected brokers
  如果不是的,
  // Stop the replica first. The state change below initiates ZK changes which should take some time
  // before which the stop replica request should be completed (in most cases)
  对应的问题是, 如果关闭controller的时间足够长的话,会导致timeout,然后会重新发送关闭的请求。因为锁的缘故,回导致再次的请求也会超时。这样会导致controller的非正常关闭, 重新启动时会有会滚的操作。 虽然这种情况下不会影响到具体的使用。
  https://issues.apache.org/jira/browse/KAFKA-1342
  分析启动过程:
  1, 设置状态为Starting
  2, kafkaScheduler.startup - 主要是后台需要定时执行的一些任务
  3, initZk - 初始化和zookeeper的链接
  4, logManager.startup - 这个主要是通过上面的scheduler来定时循环执行三个任务:kafka-log-retention kafka-log-flusher kafka-recovery-point-checkpoint,如果配置了清理的话,还会起动
  5, socketServer.startup,是个NIO的服务, 线程模型如下
  1 Acceptor thread that handles new connections
  N Processor threads that each have their own selector and read requests from sockets
  M Handler threads that handle requests and produce responses back to the processor threads for writing.
  6, replicaManager.startup - 主要是通过调度器定时执行 maybeShrinkIsr方法的线程
  7, createOffsetManager - 通过调度器启动定时执行 compact 方法的线程
  8, kafkaController.startup - 注册zk session失效事件,竞争leader。如果是leader的话,则会回调
  kafkaController的 onControllerFailover 方法。
  9, consumerCoordinator.startup - Kafka coordinator handles consumer group and consumer offset management 主要是处理消费组和消费者偏移量的问题
  10, start processing requests requestHandlerPool-KafkaApis 主要是通过KafkaRequestHandlerPool
  来启动处理请求的线程,每个线程实际最后调用的还是KafkaApis
  11, 设置状态 runningAsBroker
  12, topicConfigManager.startup - 主要监听 /config/changes,然后 Process the given list of config changes
  13, tell everyone we are alive - KafkaHealthcheck.startup,主要是和zk保持心跳连接
  14, register broker metrics - 主要是一些统计信息
  Broker的状态
  broker 有以下几种状态

case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object RunningAsController extends BrokerStates { val state: Byte = 4 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
  状态之间的流转图如下:
  /**
 * Broker states are the possible state that a kafka broker can be in.
 * A broker should be only in one state at a time.
 * The expected state transition with the following defined states is:
 *
 *                +-----------+
 *                |Not Running|
 *                +-----+-----+
 *                      |
 *                      v
 *                +-----+-----+
 *                |Starting   +--+
 *                +-----+-----+  | +----+------------+
 *                      |        +>+RecoveringFrom   |
 *                      v          |UncleanShutdown  |
 * +----------+     +-----+-----+  +-------+---------+
 * |RunningAs |     |RunningAs  |            |
 * |Controller+<--->+Broker     +<-----------+
 * +----------+     +-----+-----+
 *        |              |
 *        |              v
 *        |       +-----+------------+
 *        |-----> |PendingControlled |
 *                |Shutdown          |
 *                +-----+------------+
 *                      |
 *                      v
 *               +-----+----------+
 *               |BrokerShutting  |
 *               |Down            |
 *               +-----+----------+
 *                     |
 *                     v
 *               +-----+-----+
 *               |Not Running|
 *               +-----------+
 *
 * Custom states is also allowed for cases where there are custom kafka states for different scenarios.
 */

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379861-1-1.html 上篇帖子: 【Kafka十二】关于Kafka是一个Commit Log Service 下篇帖子: 拨开kafka 的外套
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表