Kafka的Message格式( 二 )


Kafka的Message格式

文章插图
三、Record
在Kafka中 , Record Batch和Record是两种不同的数据结构 , 但它们之间存在着紧密的关系 。
Record是指Kafka中的一条消息 , 通常由Key、Value、Timestamp等字段组成 。而Record Batch是指将多个相关的Record打包在一起进行传输和处理的数据结构 , 每个Record Batch通常包含若干条记录 , 并且这些记录具有相同的key、value类型和所属的topic和partition 。
具体来说 , 每个Record Batch中的Record都被依次存储在一个连续的二进制数据块中 , 每个Record包含自己的Header和Body部分 。而Record Batch则包含了当前Batch的元数据信息和所有记录的元数据信息 , 如Batch Size、First Offset、Last Offset、CRC校验值等 。
消息格式如下所示:
#消息长度
length: varint
#消息属性
attributes: int8
# 时间戳增量
bit 0~7: unusedtimestampDelta: varlong
#偏移量增量
offsetDelta: varint
#key长度
keyLength: varint
#key值
key: byte[]
#value长度
valueLen: varint
#value值
value: byte[]
#header信息
Headers => [Header]
Record信息通过如下方式封装
public static int writeTo(DataOutputStream out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
// 消息总数
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
ByteUtils.writeVarint(sizeInBytes, out);
// 属性
byte attributes = 0; // there are no used record attributes at the moment
out.write(attributes);
// 时间增量
ByteUtils.writeVarlong(timestampDelta, out);
// 位移增量
ByteUtils.writeVarint(offsetDelta, out);
// key
if (key == null) {
ByteUtils.writeVarint(-1, out);
} else {
int keySize = key.remAIning();
// key size
ByteUtils.writeVarint(keySize, out);
// key
Utils.writeTo(out, key, keySize);
}
// Value
if (value =https://www.isolves.com/it/cxkf/rongqi/2023-06-28/= null) {
ByteUtils.writeVarint(-1, out);
} else {
int valueSize = value.remaining();
// value size
ByteUtils.writeVarint(valueSize, out);
// value
Utils.writeTo(out, value, valueSize);
}
// header
ByteUtils.writeVarint(headers.length, out);
for (Header header : headers) {
// header key
String headerKey = header.key();
byte[] utf8Bytes = Utils.utf8(headerKey);
// header key 长度
ByteUtils.writeVarint(utf8Bytes.length, out);
// header key 值
out.write(utf8Bytes);
// header value
byte[] headerValue = https://www.isolves.com/it/cxkf/rongqi/2023-06-28/header.value();
if (headerValue =https://www.isolves.com/it/cxkf/rongqi/2023-06-28/= null) {
ByteUtils.writeVarint(-1, out);
} else {
// header value 长度
ByteUtils.writeVarint(headerValue.length, out);
// header value 值
out.write(headerValue);
}
}
return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}
根据以上代码逻辑 , 用以下图表示 V2 版本消息格式的样子:
Kafka的Message格式

文章插图
四、总结
message(又称record)总是分批写入的 。一批消息的技术术语是一个record batch:
 
  • 一个record batch包含一个或多个record 。
  • 在退化的情况下 , 我们可以有一个包含单个record的record batch 。
  • record batch和record有它们自己的headers 。

【Kafka的Message格式】


推荐阅读