第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题( 二 )


文章插图
 
2.2.3.异步发送实现接口
org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递给 send 方法 。

第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题

文章插图
 
2.3.更多发送配置生产者有很多属性可以设置,大部分都有合理的默认值,无需调整 。有些参数可能对内存使用,性能和可靠性方面有较大影响 。可以参考
org.apache.kafka.clients.producer 包下的 ProducerConfig 类 。
第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题

文章插图
 
acks:
Kafk 内部的复制机制是比较复杂的,这里不谈论内部机制(后续章节进行细讲),我们只讨论生产者发送消息时与副本的关系 。
指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响 。
acks=0:生产者在写入消息之前不会等待任 何来自服务器的响应,容易丢消息,但是吞吐量高 。
acks=1:只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应 。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息 。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失 。默认使用这个配置 。
acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应 。延迟高 。
金融业务,主备外加异地灾备 。所以很多高可用场景一般不是设置 2 个副本,有可能达到 5 个副本,不同机架上部署不同的副本,异地上也部署一套副本 。
buffer.memory
设置生产者内存缓冲区的大小(结合生产者发送消息的基本流程),生产者用它缓冲要发送到服务器的消息 。如果数据产生速度大于向 broker 发送的速度,导致生产者空间不足,producer 会阻塞或者抛出异常 。缺省 33554432 (32M)
max.block.ms
指定了在调用 send()方法或者使用 partitionsFor()方法获取元数据时生产者的阻塞时间 。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞 。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常 。缺省 60000ms
retries
发送失败时,指定生产者可以重发消息的次数(缺省 Integer.MAX_VALUE) 。默认情况下,生产者在每次重试之间等待 100ms,可以通过参数retry.backoff.ms 参数来改变这个时间间隔 。
receive.buffer.bytes 和 send.buffer.bytes
指定 TCP socket 接受和发送数据包的缓存区大小 。如果它们被设置为-1,则使用操作系统的默认值 。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽 。缺省 102400
batch.size
当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里 。该参数指定了一个批次可以使用的内存大小,按照字节数计算 。当批次内存被填满后,批次里的所有消息会被发送出去 。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送 。缺省16384(16k),如果一条消息超过了批次的大小,会写不进去 。
linger.ms
指定了生产者在发送批次前等待更多消息加入批次的时间 。它和 batch.size 以先到者为先 。也就是说,一旦我们获得消息的数量够 batch.size 的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比 batch.size 设置要小的多,我们需要“linger”特定的时间以获取更多的消息 。这个设置默认为 0,即没有延迟 。设定 linger.ms=5,例如,将会减少请求数目,但是同时会增加 5ms 的延迟,但也会提升消息的吞吐量 。
compression.type
producer 用于压缩数据的压缩类型 。默认是无压缩 。正确的选项值是 none、gzip、snAppy 。压缩最好用于批量处理,批量处理消息越多,压缩性能越好 。snappy 占用 cpu 少,提供较好的性能和可观的压缩比,如果比较关注性能和网络带宽,用这个 。如果带宽紧张,用 gzip,会占用较多的 cpu,但提供更高的压缩比 。
client.id
当向 server 发出请求时,这个字符串会发送给 server 。目的是能够追踪请求源头,以此来允许 ip/port 许可列表之外的一些应用可以发送信息 。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪 。
max.in.flight.requests.per.connection


推荐阅读