kafka rebalance机制与Consumer多种消费模式案例
Producer端基本数据结构
ProducerRecord: 一个ProducerRecord表示一条待发送的消息记录,主要由5个字段构成:
1
2
3
4
5topic 所属topic
partition 所属分区
key 键值
value 消息体
timestamp 时间戳RecordMetadata: Kafka服务器端返回给客户端的消息的元数据信息,前3项相对比较重要,Producer端可以使用这些消息做一些消息发送成功之后的处理。
1
2
3
4
5
6offset 该条消息的位移
timestamp 消息时间戳
topic + partition 所属topic的分区
checksum 消息CRC32码
serializedKeySize 序列化后的消息键字节数
serializedValueSize 序列化后的消息体字节数
Producer端消息发送流程
在send()的发送消息动作触发之前,通过props属性中指定的servers连接到broker集群,从Zookeeper收集集群Metedata信息,从而了解哪些broker掌管哪一个Topic的哪一个partition,以及brokers的健康状态。
下面就是流水线操作,ProducerRecord对象携带着topic,partition,message等信息,在Serializer这个“车间”被序列化。
序列化过后的ProducerRecord对象进入Partitioner“车间”,按照上文所述的Partitioning 策略决定这个消息将被分配到哪个Partition中。
确定partition的ProducerRecord进入一个缓冲区,通过减少IO来提升性能,在这个“车间”,消息被按照TopicPartition信息进行归类整理,相同Topic且相同parition的ProducerRecord被放在同一个RecordBatch中,等待被发送。什么时候发送?都在Producer的props中被指定了,有默认值,显然我们可以自己指定。
1
2
3
4(1) batch.size:设置每个RecordBatch可以缓存的最大字节数
(2) buffer.memory:设置所有RecordBatch的总共最大字节数
(3) linger.ms设置每个RecordBatch的最长延迟发送时间
(4) max.block.ms 设置每个RecordBatch的最长阻塞时间一旦,当单个RecordBatch的linger.ms延迟到达或者batch.size达到上限,这个 RecordBatch会被立即发送。另外,如果所有RecordBatch作为一个整体,达到了buffer.memroy或者max.block.ms上限,所有的RecordBatch都会被发送。
ProducerRecord消息按照分配好的Partition发送到具体的broker中,broker接收保存消息,更新Metadata信息,同步给Zookeeper。
Producer端其他优化点:
1
2
3
4
5(5) acks:Producer的数据确认阻塞设置,0表示不管任何响应,只管发,发完了立即执行下个任务,这种方式最快,但是很不保险。1表示只确保leader成功响应,接收到数据。2表示确保leader及其所有follwer成功接收保存消息,也可以用”all”。
(6) retries:消息发送失败重试的次数。
(7) retry.backoff.ms:失败补偿时间,每次失败重试的时间间隔,不可设置太短,避免第一条消息的响应还没返回,第二条消息又发出去了,造成逻辑错误。
(8) max.in.flight.request.per.connection:同一时间,每个Producer能够发送的消息上限。
(9) compression.type producer所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置
消息缓冲区(accumulator)再剖析
producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。
该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。
假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{“test-0” -> [batch1, batch2], “test-1” -> [batch3]}
每个batch中最重要的3个组件包括:
1
2
3compressor: 负责执行追加写入操作
batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方
thunks:保存消息回调逻辑的集合
Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的:
1
2
3
4(1)不断轮询缓冲区寻找已做好发送准备的分区
(2)将轮询获得的各个batch按照目标分区所在的leader broker进行分组
(3)将分组后的batch通过底层创建的Socket连接发送给各个broker
(4)等待服务器端发送response回来
- Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法
总结
- Sender线程自KafkaProducer创建后就一直都在运行着,单个RecordBatch的linger.ms延迟到达或者batch.size达到上限,作为后台线程就会检测到立即发送。
- accumulator缓冲器按照Topic partion进行分组,来进行集中向某一个Broker发送。
- 本文链接:https://gjtmaster.github.io/2018/09/19/kafka集群Producer基本数据结构及工作流程/
- 版权声明:The author owns the copyright, please indicate the source reproduced.