kafka集群Controller竞选与责任设计思路架构详解
无所不能的Controller
某一个broker被选举出来承担特殊的角色,就是控制器Controller。
Leader会向zookeeper上注册Watcher,其他broker几乎不用监听zookeeper的状态变化。
Controller集群就是用来管理和协调Kafka集群的,具体就是管理集群中所有分区的状态和分区对应副本的状态。
每一个Kafka集群任意时刻都只能有一个controller,当集群启动的时候,所有的broker都会参与到controller的竞选,最终只能有一个broker胜出。
Controller维护的状态分为两类:1:管理每一台Broker上对应的分区副本。2:管理每一个Topic分区的状态。
KafkaController 核心代码,其中包含副本状态机和分区状态机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
private val autoRebalanceScheduler = new KafkaScheduler(1)
var deleteTopicManager: TopicDeletionManager = null
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
private val partitionReassignedListener = new PartitionsReassignedListener(this)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)KafkaController中共定义了五种selector选举器
1
2
3
4
5
6
7
8
9
101、ReassignedPartitionLeaderSelector
从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。
2、PreferredReplicaPartitionLeaderSelector
如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。
3、ControlledShutdownLeaderSelector
将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。
4、NoOpLeaderSelector
原则上不做任何事情,返回当前的leader和isr。
5、OfflinePartitionLeaderSelector
从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。kafka修改分区和副本数
1
2
3
4
5
6../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: test1 Partition: 1 Leader: 3 Replicas: 3,5 Isr: 3,5
Topic: test1 Partition: 2 Leader: 4 Replicas: 4,1 Isr: 4,1topic 分区扩容
1
2./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1
复制代码
ReplicaStateMachine (ZK持久化副本分配方案)
Replica有7种状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
151 NewReplica: 在partition reassignment期间KafkaController创建New replica
2 OnlineReplica: 当一个replica变为一个parition的assingned replicas时
其状态变为OnlineReplica, 即一个有效的OnlineReplica
3 Online状态的parition才能转变为leader或isr中的一员
4 OfflineReplica: 当一个broker down时, 上面的replica也随之die, 其状态转变为Onffline;
ReplicaDeletionStarted: 当一个replica的删除操作开始时,其状态转变为ReplicaDeletionStarted
5 ReplicaDeletionSuccessful: Replica成功删除后,其状态转变为ReplicaDeletionSuccessful
6 ReplicaDeletionIneligible: Replica成功失败后,其状态转变为ReplicaDeletionIneligible
7 NonExistentReplica: Replica成功删除后, 从ReplicaDeletionSuccessful状态转变为NonExistentReplica状态ReplicaStateMachine 所在文件: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
startup: 启动ReplicaStateMachine
initializeReplicaState: 初始化每个replica的状态, 如果replica所在的broker是live状态,则此replica的状态为OnlineReplica。
处理可以转换到Online状态的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 并且发送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
当创建某个topic时,该topic下所有分区的所有副本都是NonExistent。
当controller加载Zookeeper中该topic每一个分区的所有副本信息到内存中,同时将副本的状态变更为New。
之后controller选择该分区副本列表中的第一个副本作为分区的leader副本并设置所有副本进入ISR,然后在Zookeeper中持久化该决定。
一旦确定了分区的Leader和ISR之后,controller会将这些消息以请求的方式发送给所有的副本。
同时将这些副本状态同步到集群的所有broker上以便让他们知晓。
最后controller 会把分区的所有副本状态设置为Online。
partitionStateMachine (根据副本分配方案创建分区)
Partition有如下四种状态
1
2
3
4NonExistentPartition: 这个partition还没有被创建或者是创建后又被删除了;
NewPartition: 这个parition已创建, replicas也已分配好,但leader/isr还未就绪;
OnlinePartition: 这个partition的leader选好;
OfflinePartition: 这个partition的leader挂了,这个parition状态为OfflinePartition;当创建Topic时,controller负责创建分区对象,它首先会短暂的将所有分区状态设置为NonExistent。
之后读取Zookeeper副本分配方案,然后令分区状态设置为NewPartion。
处于NewPartion状态的分区尚未有leader和ISR,因此Controller会初始化leader和ISR信息并设置分区状态为OnlinePartion,此时分区正常工作。
Controller职责所在(监听znode状态变化做执行)
- UpdateMetadataRequest:更新元数据请求(比如:topic有多少个分区,每一个分区的leader在哪一台broker上以及分区的副本列表),随着集群的运行,这部分信息随时都可能变更,一旦发生变更,controller会将最新的元数据广播给所有存活的broker。具体方式就是给所有broker发送UpdateMetadataRequest请求
- CreateTopics: 创建topic请求。当前不管是通过API方式、脚本方式(–create)抑或是CreateTopics请求方式来创建topic,做法几乎都是在Zookeeper的/brokers/topics下创建znode来触发创建逻辑,而controller会监听该path下的变更来执行真正的“创建topic”逻辑
- DeleteTopics:删除topic请求。和CreateTopics类似,也是通过创建Zookeeper下的/admin/delete_topics/节点来触发删除topic,主要逻辑有:1:停止所有副本运行。2:删除所有副本的日志数据。3:移除zk上的 /admin/delete_topics/节点。
- 分区重分配:即kafka-reassign-partitions脚本做的事情。同样是与Zookeeper结合使用,脚本写入/admin/reassign_partitions节点来触发,controller负责按照方案分配分区。执行过程是:先扩展再伸缩机制(旧副本和新副本集合同时存在)。
- Preferred leader分配:调整分区leader副本,preferred leader选举当前有两种触发方式:1. 自动触发(auto.leader.rebalance.enable = true),controller会自动调整Preferred leader。2. kafka-preferred-replica-election脚本触发。两者步骤相同,都是向Zookeeper的/admin/preferred_replica_election写数据,controller提取数据执行preferred leader分配
- 分区扩展:即增加topic分区数。标准做法也是通过kafka-reassign-partitions脚本完成,不过用户可直接往Zookeeper中写数据来实现,比如直接把新增分区的副本集合写入到/brokers/topics/下,然后controller会为你自动地选出leader并增加分区
- 集群扩展:新增broker时Zookeeper中/brokers/ids下会新增znode,controller自动完成服务发现的工作
- broker崩溃:同样地,controller通过Zookeeper可实时侦测broker状态。一旦有broker挂掉了,controller可立即感知并为受影响分区选举新的leader
- ControlledShutdown:broker除了崩溃,还能“优雅”地退出。broker一旦自行终止,controller会接收到一个ControlledShudownRequest请求,然后controller会妥善处理该请求并执行各种收尾工作
- Controller leader选举:controller必然要提供自己的leader选举以防这个全局唯一的组件崩溃宕机导致服务中断。这个功能也是通过Zookeeper的帮助实现的。
Controller与Broker之间的通信机制(NIO select)
- controller启动时会为集群中的所有Broker创建一个专属的Socket连接,假如有100台broker机器,那么controller会创建100个Socket连接。新版本目前统一使用NIO select ,实际上还是要维护100个线程。
ControllerContext数据组件
- controller的缓存,可谓是最重要的数据组件了,ControllerContext汇总了Zookeeper中关于kafka集群中所有元数据信息,是controller能够正确提供服务的基础。
总结
kafka集群Controller主要通过ZK持久化副本分配方案,根据副本分配方案创建分区,监听ZK znode状态变化做执行处理,维护分区和副本ISR机制稳定运行。
- 本文链接:https://gjtmaster.github.io/2018/09/21/kafka集群Controller竞选与责任设计思路架构详解/
- 版权声明:The author owns the copyright, please indicate the source reproduced.