第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题( 三 )


指定了生产者在接收到服务器响应之前可以发送多个消息,值越高,占用的内存越大,当然也可以提升吞吐量 。发生错误时,可能会造成数据的发送顺序改变,默认是 5 (修改) 。
如果需要保证消息在一个分区上的严格顺序,这个值应该设为 1 。不过这样会严重影响生产者的吞吐量 。
request.timeout.ms
客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常,默认 30 秒 。
metadata.fetch.timeout.ms
是指我们所获取的一些元数据的第一个时间数据 。元数据包含:topic,host,partitions 。此项配置是指当等待元数据 fetch 成功完成所需要的时间,否则会跑出异常给客户端
max.request.size
控制生产者发送请求最大大小 。默认这个值为 1M,如果一个请求里只有一个消息,那这个消息不能大于 1M,如果一次请求是一个批次,该批次包含了 1000 条消息,那么每个消息不能大于 1KB 。注意:broker 具有自己对消息记录尺寸的覆盖,如果这个尺寸小于生产者的这个设置,会导致消息被拒绝 。这个参数和 Kafka 主机的 message.max.bytes 参数有关系 。如果生产者发送的消息超过 message.max.bytes 设置的大小,就会被 Kafka 服务器拒绝 。
以上参数不用全记住,一般来说,就记住 acks、batch.size、linger.ms、max.request.size 就行了,因为这 4 个参数重要些,其他参数一般没有太大必要调整
2.4.顺序保证Kafka 可以保证同一个分区里的消息是有序的 。也就是说,发送消息时,主题只有且只有一个分区,同时生产者按照一定的顺序发送消息,broker 就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们 。在某些情况下,顺序是非常重要的 。例如,往一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!不过,有些场景对顺序不是很敏感 。

第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题

文章插图
 
如果把 retires 设为非零整数,同时把
max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次 。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了 。
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retires 设为 0(不重试的话消息可能会因为连接关闭等原因会丢)。所以还是需要重试,同时把
max.in.flight.request.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发
送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做 。
2.5.序列化创建生产者对象必须指定序列化器,默认的序列化器并不能满足我们所有的场景 。我们完全可以自定义序列化器 。只要实现
org.apache.kafka.common.serialization.Serializer 接口即可 。
2.5.1.自定义序列化需要考虑的问题自定义序列化容易导致程序的脆弱性 。举例,在我们上面的实现里,我们有多种类型的消费者,每个消费者对实体字段都有各自的需求,比如,有的将字段变更为 long 型,有的会增加字段,这样会出现新旧消息的兼容性问题 。特别是在系统升级的时候,经常会出现一部分系统升级,其余系统被迫跟着升级的情况 。
解决这个问题,可以考虑使用自带格式描述以及语言无关的序列化框架 。比如 Protobuf,或者 Kafka 官方推荐的 Apache Avro 。
Avro 会使用一个 JSON 文件作为 schema 来描述数据,Avro 在读写时会用到这个 schema,可以把这个 schema 内嵌在数据文件中 。这样,不管数据格式如何变动,消费者都知道如何处理数据 。
但是内嵌的消息,自带格式,会导致消息的大小不必要的增大,消耗了资源 。我们可以使用 schema 注册表机制,将所有写入的数据用到的 schema保存在注册表中,然后在消息中引用 schema 的标识符,而读取的数据的消费者程序使用这个标识符从注册表中拉取 schema 来反序列化记录 。
注意:Kafka 本身并不提供 schema 注册表,需要借助第三方,现在已经有很多的开源实现,比如 Confluent Schema Registry,可以从 GitHub 上获取 。
如何使用参考如下网址:
https://cloud.tencent.com/developer/article/1336568
第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题

文章插图
 
2.6.分区我们在新增 ProducerRecord 对象中可以看到,ProducerRecord 包含了目标主题,键和值,Kafka 的消息都是一个个的键值对 。键可以设置为默认的 null 。


推荐阅读