Kafka 消息格式变迁

从0.8.x版本开始到现在的1.1.x版本,Kafka的消息格式也经历了3个版本。每次版本的改变,都预示着新的优化。那么Broker作为Kafka服务载体,承担了消息协议的响应和接收。

  • 持久化消息。
  • 把消息从发送端过渡到消费端。

JVM消息重排机制

  • Java内存模型保存对象的开销很大,甚至可能需要花费比消息大两倍的空间来保存数据。为了降低这种开销,JMM(Java Memory Model)会对用户自定义的类进行字段重排。
  • 垃圾回收随着堆上数据的扩张,会从整体上拖累应用程序的吞吐量。
  • JMM要求对象必须按照8字节对齐,未对齐的部分会填充空白字符进行补齐padding。

img

  • 对齐填充计算方法:HotSpot 的对齐方式为 8 字节对齐,不足的需要 Padding 填充对齐, 公式:

    1
    (对象头 + 实例数据 + padding)% 8 == 00<= padding <8
  • 对一个java对象,至少需要16字节对象头部(对于64位JVM对象通常由8字节的Word组成)。

Kafka 轻装上阵对象存储

  • kafka采用Java NIO的ByteBuffer来保存消息,同时依赖文件系统提供的页缓存机制,不再依赖java的堆缓存。悖论:写文件系统时,如果java的堆缓存保存一份对象,那么页缓存还会保存一份,何必呢?
  • ByteBuffer是紧凑的二进制字节结构,不需要padding,因此可以省去很多不必要的内存开销。
  • 在一个64G内存的机器上,kafka可以使用内存到58-62GB之间,不用担心Java GC 。
  • ByteBuffer可以节省大量空间,相比于java的堆缓存方案。

V0(消息元祖)=> 14字节+12(LOG_OVERHEAD)

img

  • 版本号: V0版本magic=0,V1版本magic=1,V2版本magic=2

  • 属性:消息压缩类型。目前仅支持3种压缩方法。

    1
    0X00 未启动压缩 	0x01 GZIP  	0x02 Snappy  	0x03 LZ4
  • 注意key长度字段和value长度字段是固定的,没有也占用4个字节,来存 -1

  • 除了key值和value值外,可以统称为是消息头部信息(header),总共占用14字节。

    1
    2
    3
    假设:存在Key值 为key , value值为 value  (一个字符一个字节,共8个字节)
    则 header 14 字节 + 值 8字节 = 22字节
    key为空时,则占用 19字节
  • 日志头部(LOG_OVERHEAD):每个Record(v0和v1版)必定对应一个offset和message size。每条消息都一个offset用来标志它在partition中的偏移量,这个offset是逻辑值,而非实际物理偏移值,message size表示消息的大小,这两者的一起被称之为日志头部(LOG_OVERHEAD),固定为12B

4.1 V0集合(被V2Batch取代)

img

总结:一条消息必定包含LOG_OVERHEAD和消息体两部分。最小占用12B+14B=26B,在不包含key值和Value值的情况下。

若key =key , value=value 则占用26(纯格式)+8(值空间)=34B

1
2
3
4
5
6
7
首先创建一个partition数和副本数都为1的topic,名称为“msg_format_v0”,
然后往msg_format_v0中发送一条key=”key”,value=”value”的消息,之后查看对应的日志:

-rw-r--r-- 1 root root 34 Apr 26 02:52 00000000000000000000.log

再次插入一条key=null, value=”value”的消息:
-rw-r--r-- 1 root root 65 Apr 26 02:56 00000000000000000000.log

img

总结:发送每一条消息必须携带12字节LOG_OVERHEAD,是分散的消息格式设计,没有体现集合的味道。

V1(消息戳进阶)=> 22字节+12(LOG_OVERHEAD)

img

  • kafka从0.10.0版本开始到0.11.0版本之前所使用的消息格式版本为v1,其比v0版本就多了一个timestamp字段,表示消息的时间戳

img

  • 因此像v0版本介绍的一样发送一条key=”key”,value=”value”的消息,那么此条消息在v1版本中会占用42B

举例如下:发送第一条key=”key”,value=”value”的消息,则占用22+12+8=42B发送第二条key=null,value=”value”的消息,,则占用12+22+5=39B

合在一起发则为:42+39=81B

5.1 V1集合(被V2Batch取代)

img

V2(变长整型与ZigZag) => 7个字节+ 值key+ 值value =15字节

  • kafka从0.11.0版本开始所使用的消息格式版本为v2,这个版本的消息相比于v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。
  • Varints是使用一个或多个字节来序列化整数的一种方法,数值越小,其所占用的字节数就越少。ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间,以使得带符号整数映射为无符号整数,这样可以使得绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3。
  • zig-zags 会固定的将每一个字节的第一位留作特殊用途,表明该字节是否是最后一个字节,若最高位是1,表示编码尚未结束。因此实际上也仅有7位用于实际的编码,即0-127。另外考虑 -1 ,1, -2, 2 对应 1,2, 3, 4。因此,0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节。kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节)。
  • 注意的是Varints并非一直会省空间,一个int32最长会占用5个字节(大于默认的4字节),一个int64最长会占用10字节(大于默认的8字节)

img

总结 :v2版本的消息格式去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息,并且attributes被弃用。

6.1 V2 Record Batch => 61字节+7字节(纯格式)+ 值key+ 值value =76字节

v2版本对于消息集(RecordBatch)做了彻底的修改,总共占用了61个字节,比如:把crc校验放在了Batch这一层,幂等性引入,使用PID标识。epoch引入,标识当前版本。看似增大了消息的容量大小,从大规模消息来算的话,却带来了质的飞跃,因为一条纯消息格式仅占用7字节了,而V1占用22字节,V0占用14字节。

img

1
2
3
4
5
6
7
8
9
10
11
12
first offset:表示当前RecordBatch的起始位移。
length:计算partition leader epoch到headers之间的长度。
partition leader epoch:用来确保数据可靠性,详细可以参考KIP-101
magic:消息格式的版本号,对于v2版本而言,magic等于2
attributes:消息属性,注意这里占用了两个字节。低3位表示压缩格式,可以参考v0和v1;第4位表示时间戳类型;第5位表示此RecordBatch是否处于事务中,0表示非事务,1表示事务。第6位表示是否是Control消息,0表示非Control消息,而1表示是Control消息,Control消息用来支持事务功能。
last offset delta:RecordBatch中最后一个Record的offsetfirst offset的差值。主要被broker用来确认RecordBatch中Records的组装正确性。
first timestamp:RecordBatch中第一条Record的时间戳。
max timestamp:RecordBatch中最大的时间戳,一般情况下是指最后一个Record的时间戳,和last offset delta的作用一样,用来确保消息组装的正确性。
producer id:用来支持幂等性,详细可以参考KIP-98
producer epoch:和producer id一样,用来支持幂等性。
first sequence:和producer id、producer epoch一样,用来支持幂等性。
records count:RecordBatch中Record的个数。

升华

V1 版本的弊端:

  • 空间利用率低,key和value长度都各占4个字节,浪费。
  • 只保留最新位移,导致若获取第一条消息,就需要压缩解压缩后,遍历出第一条数据。
  • 每一个消息都会有CRC校验。
  • 没有消息长度的概念。

V2 新型架构优点:

  • 增加消息总长度
  • 保留时间戳,同时仅使用1个字节来保存,比如:10条消息,V2需要100个字节。V1就需要800个字节了。-保存位移增量。
  • 去除CRC校验。
  • 废除attribute,移到外层目录。