消费者的测试类:
/** * @ClassName MyConsumerTest * @Description TODO * @Author lingxiangxiang * @Date 4:23 PM * @Version 1.0 **/public class MyConsumerTest { public static void main(String[] args) { MyKafkaConsumer consumer = new MyKafkaConsumer(); consumer.start(); System.out.println("=================="); }}
文章插图
②消费者同步手动提交
前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的,但自动提交 可能会出现消息重复消费的情况 。
所以在生产环境下,很多时候需要对 Offset 进行手动提交,以解决重复消费的问题 。
手动提交又可以划分为同步提交、异步提交,同异步联合提交 。这些提交方式仅仅是 doWork() 方法不相同,其构造器是相同的 。
所以下面首先在前面消费者类的基础上进行构造器的修改,然后再分别实现三种不同的提交方式 。
同步提交方式是,消费者向 Broker 提交 Offset 后等待 Broker 成功响应 。若没有收到响应,则会重新提交,直到获取到响应 。
而在这个等待过程中,消费者是阻塞的 。其严重影响了消费者的吞吐量 。
修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:
import kafka.utils.ShutdownableThread;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Collections;import java.util.Properties;/** * @ClassName MyKafkaConsumer * @Description TODO * @Author lingxiangxiang * @Date 4:12 PM * @Version 1.0 **/public class MyKafkaConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public MyKafkaConsumer() { super("KafkaConsumerTest", false); Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("group.id", "mygroup"); // 这里要修改成手动提交 properties.put("enable.auto.commit", "false"); // properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("heartbeat.interval.ms", "10000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { consumer.subscribe(Arrays.asList("test2")); ConsumerRecords<Integer, String>records = consumer.poll(1000); for (ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); //手动同步提交 consumer.commitSync(); } }}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 带你了解黑客使用的命令行浏览器,令人吃惊
- 10分钟教你Python+MySQL数据库操作
- 机架式服务器的详解,一文带你读懂
- 一文带你了解IPsec VPN基本原理与配置流程,干货值得收藏
- 带你了解香港服务器和美国服务器的区别
- 10分钟将你的Go工程转换为Go Module模式
- 带你了解太极拳的健身原理是什么
- 黄瓜|春天减肥,试试这5道“刮油菜”,10分钟端上桌,好吃解腻
- 齐刘海|素颜也好看的女生是什么样子的?女生护肤?带你get护肤小步骤!
- “疙瘩汤”的这个做法火了,10分钟出锅,好吃解馋,怎么也吃不够