第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题( 五 )


分区分配给消费者的策略 。系统提供两种策略 。默认为 Range 。允许自定义策略 。
Range
把主题的连续分区分配给消费者 。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)
RoundRobin
把主题的分区循环分配给消费者 。

第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题

文章插图
 
自定义策略
extends 类 AbstractPartitionAssignor,然后在消费者端增加参数:
properties.put(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 类.class.getName());即可 。
max.poll.records
控制每次 poll 方法返回的的记录数量 。
fetch.min.bytes
每次 fetch 请求时,server 应该返回的最小字节数 。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回 。缺省为 1 个字节 。多消费者下,可以设大这个值,以降低 broker 的工作负载
fetch.wait.max.ms
如果没有足够的数据能够满足 fetch.min.bytes,则此项配置是指在应答 fetch 请求之前,server 会阻塞的最大时间 。缺省为 500 个毫秒 。和上面的fetch.min.bytes 结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足 。
max.partition.fetch.bytes
指定了服务器从每个分区里返回给消费者的最大字节数,默认 1MB 。假设一个主题有 20 个分区和 5 个消费者,那么每个消费者至少要有 4MB 的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大 。注意,这个参数要比服务器的 message.max.bytes 更大,否则消费者可能无法读取消息 。
session.timeout.ms
如果 consumer 在这段时间内没有发送心跳信息,则它会被认为挂掉了 。默认 3 秒 。
client.id
当向 server 发出请求时,这个字符串会发送给 server 。目的是能够追踪请求源头,以此来允许 ip/port 许可列表之外的一些应用可以发送信息 。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪 。
receive.buffer.bytes 和 send.buffer.bytes
指定 TCP socket 接受和发送数据包的缓存区大小 。如果它们被设置为-1,则使用操作系统的默认值 。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽 。
3.4.消费者中的基础概念消费者的含义,同一般消息中间件中消费者的概念 。在高并发的情况下,生产者产生消息的速度是远大于消费者消费的速度,单个消费者很可能会负担不起,此时有必要对消费者进行横向伸缩,于是我们可以使用多个消费者从同一个主题读取消息,对消息进行分流 。
(买单的故事,群组,消费者的一群人,消费者:买单的,分区:一笔单,一笔单能被买单一次,当然一个消费者可以买多个单,如果有一个消费者挂掉了<跑单了>,另外的消费者接上)
第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题

文章插图
 
3.4.1.订阅创建消费者后,使用 subscribe()方法订阅主题,这个方法接受一个主题列表为参数,也可以接受一个正则表达式为参数;正则表达式同样也匹配多个主题 。如果新创建了新主题,并且主题名字和正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题 。比如,要订阅所有和 test相关的主题,可以 subscribe(“tets.*”)
3.4.2.轮询为了不断的获取消息,我们要在循环中不断的进行轮询,也就是不停调用 poll 方法 。
poll 方法的参数为超时时间,控制 poll 方法的阻塞时间,它会让消费者在指定的毫秒数内一直等待 broker 返回数据 。poll 方法将会返回一个记录(消息)列表,每一条记录都包含了记录所属的主题信息,记录所在分区信息,记录在分区里的偏移量,以及记录的键值对 。
poll 方法不仅仅只是获取数据,在新消费者第一次调用时,它会负责查找群组,加入群组,接受分配的分区 。如果发生了再均衡,整个过程也是在轮询期间进行的 。
3.4.3.提交和偏移量当我们调用 poll 方法的时候,broker 返回的是生产者写入 Kafka 但是还没有被消费者读取过的记录,消费者可以使用 Kafka 来追踪消息在分区里的位置,我们称之为 偏移量 。消费者更新自己读取到哪个消息的操作,我们称之为 提交 。
消费者是如何提交偏移量的呢?消费者会往一个叫做_consumer_offset 的特殊主题发送一个消息,里面会包括每个分区的偏移量 。
3.5.消费者中的核心概念3.5.1.多线程安全问题KafkaConsumer 的实现不是线程安全的,所以我们在多线程的环境下,使用 KafkaConsumer 的实例要小心,应该每个消费数据的线程拥有自己的


推荐阅读