Kafka的Message格式

消息引擎的核心职责就是将生产者生产的消息传输到消费者 , 设计消息格式是各大消息引擎框架的关键问题 , 因为消息格式决定了消息引擎的性能和效率 。本文带大家探究消息引擎kafka当前所用的message格式是什么 。
一、Kafka message format
kafka从0.11.0版本开始所使用的消息格式版本为v2 , 参考了 Protocol Buffer而引入了变长整型(Varints)和 ZigZag 编码 。Varints是使用一个或多个字节来序列化整数的一种方法 , 数值越小 , 其所占用的字节数就越少 。ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间 ,  以使得带符号整数映射为无符号整数 , 这样可以使得绝对值较小的负数仍然享有较小的Varints编码值 , 比如-1编码为1,1编码为2 , -2编码为3 。

Kafka的Message格式

文章插图
kafka v0和v1版本的消息格式 , 如果消息本身没有key , 那么key length字段为-1 , int类型的需要4个字节来保存 , 而如果采用Varints来编码则只需要一个字节 。根据Varints的规则可以推导出0-63之间的数字占1个字节 , 64-8191之间的数字占2个字节 , 8192-1048575之间的数字占3个字节 。而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节) , 如果消息格式中与长度有关的字段采用Varints的编码的话 ,  绝大多数情况下都会节省空间 , 而v2版本的消息格式也正是这样做的 。不过需要注意的是Varints并非一直会省空间 , 一个int32最长会占用5个字节(大于默认的4字节) ,  一个int64最长会占用10字节(大于默认的8字节) 。
因为Kafka的message经历过几次的版本迭代更改 , 本文以v2版本为例讲述 。
二、Record Batch
在Kafka中 , 数据是按照topic和partition的方式进行组织和存储的 。每个partition的数据被分成一个或多个segment文件 , 并且每个segment文件包含若干个Record Batch 。因此 , Record Batch也是Kafka中重要的数据结构之一 。
在Kafka中 , Record Batch指的是一组相关的消息集合 , 它们具有相同的key、value类型和所属的topic和partition 。每个Record Batch包含若干条消息(Record) , 并且这些消息被顺序地写入到磁盘中 , 以提高读取效率 。
具体而言 , Record Batch由以下几部分构成:
Record Batch Header:包含了当前Batch的元数据 , 如Magic Code、Batch Size、First Offset等信息 。
Record Header:每个Record都附带有一个Header , 用于描述该Record的元数据信息 , 例如时间戳、压缩类型、CRC校验值等 。
Record Body:记录具体的消息内容 , 包括Key、Value等字段 。
需要注意的是 , Kafka的Record Batch通常具有比较大的体积(默认大小为16KB) , 因此可以将多个相关的消息打包在一起进行传输和处理 , 从而提高了消息的传输效率和吞吐量 。另外 , Kafka还支持对Record Batch进行压缩和批量操作 , 以进一步提高数据的传输效率和性能 。
总的来说 , Record Batch是Kafka中定义的一个重要数据结构 , 用于管理和组织消息 , 提高消息的读写效率和传输性能 。
baseoffset: int64 标识当前的batch的起始偏移量
batchLength: int32 该batch的长度
partitionLeaderEpoch: int32 确保数据可靠性
magic: int8 魔法数字 , 当前为2 , 也即当前的message版本为v2版本
crc: int32 crc校验
attributes: int16 消息属性
bit 0~2: 是否压缩和压缩的格式
0: no compression
1: gzip
2: snAppy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
bit 7~15: unused
lastOffsetDelta: int32 RecordBatch中最后一个Record的offset与first offset的差值
baseTimestamp: int64 第一条时间戳
maxTimestamp: int64 最大的时间戳 , 保证消息组装时的正确性
producerId: int64 支持幂等性
producerEpoch: int16 支持幂等性
baseSequence: int32 支持幂等性 , 消息序号
records: [Record] Record个数
用以下图表示 V2 版本消息批次的样子:


推荐阅读