持久性

  • 持久性定义了kafka集群中消息不容易丢失的程度,持久性越高表明Kafka越不会丢失消息。
  • 持久性通常是通过冗余来实现,而kafka实现冗余的手段就是备份机制,该备份机制保证了Kafka的每一个消息都会被保存到多台Broker上。

实际可行性调优

  • 分区数和副本因子由broker端参数num.partitions和default.replication.factor指定,这两个参数默认值都是1。
  • 禁止kafka自动创建topic,可以参数设置auto.create.topics.enable=false。
  • 新版本已经把consumer端位移信息的存储从Zookeeper转移到了内部的topic consumer_offsets中,注意这个也同样会受到default.replication.factor参数影响。0.11.0.0要求consumer_offsets创建时必须满足default.replication.factor,若Broker小于default.replication.factor会直接报错。
  • 若要达到最高的持久性,必须设置acks=all(或者acks=-1),即强制leader副本等待的ISR中所有副本都响应了某条消息后发送response给producer。
  • producer发送失败后,会进行重试机制,比如网络临时抖动。
  • 当Producer重试后,待发送的消息可能已经成功,假如数据已经落盘写入日志,在发送response给Producer时出现了故障,重试时就会再次发送同一条消息,也就出现了重复的消息。自从0.11.0.0版本开始,kafka提供了幂等性Producer,实现了精确一次的处理语义。幂等性Producer保证同一条消息只会被broker写入一次。启动这样设置:enable.idempotence=true。
  • 消息乱序处理机制,producer依次发送消息m1和m2,若m1写入失败但m2写入成功,一旦重试m1,那么m1就位于m2的后面了。如果对消息的顺序要求比较严格的话,max.in.flight.requests.per.connection=1来规避这个问题。该参数目的保证单个producer连接上在任意某个时刻只处理一个请求。
  • 当某个Broker出现出现崩溃时,controller会自动检测出宕机的broker并为该Broker上的所有分区重新选举leader,选择的范围是从ISR副本中挑选。若ISR副本所在的broker全部宕机,Kafka就会从非ISR副本中选举leader。这种情况下就会造成不同步和消息丢失。是否允许Unclean选举,借助unclean.leader.election.enable参数控制,0.11.0.0版本默认是false。
  • min.insync.replicas,若成功写入消息时则必须等待响应完成的最少ISR副本,该参数是在acks=all时才会生效。
  • 自0.10.0.0版本开始,kafka仿照Hadoop为broker引入机架属性信息(rack),该参数由broker.rack设定。Kafka集群在后台会收集所有的机架信息仔仔创建分区时将分区分散到不同的机架上。
  • 日志持久化:两个重要参数log.flush.interval.ms和log.flush.interval.message。前者指定了Kafka多长时间执行一次消息“落盘”,后者是写入多少消息后执行一次消息“落盘”。默认情况下,log.flush.interval.ms是空而将log.flush.interval.message设置为Long.MAX_VALUE,这个表示Kafka实际上不会自动执行消息的“冲刷”操作,事实上这也是kafka的设计初衷。即把消息同步到磁盘的工作交由操作系统来完成,由操作系统控制页缓存数据到物理文件的同步。但是若用户希望每一条消息都冲刷一次,及时持久化,可以设置log.flush.interval.message=1。
  • 提交位移的持久化,实际应用中设置auto.commit.enable=false,同时用户需要使用commitSync方法来提交位移,而非commitAsync方法。

参数清单

broker端

  • 设置unclean.leader.election.enable=false(0.11.0.0之前版本)

  • 设置auto.create.topics.enable=false

  • 设置replication.factor=3,min.insync.replicas=1

  • 设置default.replication.factor=3

  • 设置broker.rack属性分散分区数据到不同的机架。

  • 设置log.flush.interval.ms和log.flush.interval.message为一个较小的值。

    producer端

  • 设置acks=all

  • 设置retries为一个较大的值,比如10-30。

  • 设置max.in.flight.request.per.connection=1

  • 设置enable.idempotence=true

consumer端

  • 设置auto.commit.enable=false
  • 消息消费成功后,调用commitSync提交位移。

总结

enable.idempotence=true 和max.in.flight.request.per.connection=1以及broker.rack属性比较新颖,值得一试。