10分钟带你逆袭Kafka!(11)

 
 消费者的测试类: 
/** * @ClassName MyConsumerTest * @Description TODO * @Author lingxiangxiang * @Date 4:23 PM * @Version 1.0 **/public class MyConsumerTest {    public static void main(String[] args) {        MyKafkaConsumer consumer = new MyKafkaConsumer();        consumer.start();        System.out.println("==================");    }} 

10分钟带你逆袭Kafka!

文章插图
 
 
②消费者同步手动提交 
前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的,但自动提交 可能会出现消息重复消费的情况 。
 
所以在生产环境下,很多时候需要对 Offset 进行手动提交,以解决重复消费的问题 。
 
手动提交又可以划分为同步提交、异步提交,同异步联合提交 。这些提交方式仅仅是 doWork() 方法不相同,其构造器是相同的 。
 
所以下面首先在前面消费者类的基础上进行构造器的修改,然后再分别实现三种不同的提交方式 。
 
同步提交方式是,消费者向 Broker 提交 Offset 后等待 Broker 成功响应 。若没有收到响应,则会重新提交,直到获取到响应 。
 
而在这个等待过程中,消费者是阻塞的 。其严重影响了消费者的吞吐量 。
 
修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:
import kafka.utils.ShutdownableThread;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Collections;import java.util.Properties;/** * @ClassName MyKafkaConsumer * @Description TODO * @Author lingxiangxiang * @Date 4:12 PM * @Version 1.0 **/public class MyKafkaConsumer extends ShutdownableThread {    private KafkaConsumer<Integer, String> consumer;    public MyKafkaConsumer() {        super("KafkaConsumerTest", false);        Properties properties = new Properties();        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");        properties.put("group.id", "mygroup");      // 这里要修改成手动提交        properties.put("enable.auto.commit", "false");        // properties.put("auto.commit.interval.ms", "1000");        properties.put("session.timeout.ms", "30000");        properties.put("heartbeat.interval.ms", "10000");        properties.put("auto.offset.reset", "earliest");        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        this.consumer = new KafkaConsumer<Integer, String>(properties);    }    @Override    public void doWork() {        consumer.subscribe(Arrays.asList("test2"));        ConsumerRecords<Integer, String>records = consumer.poll(1000);        for (ConsumerRecord record : records) {            System.out.println("topic = " + record.topic());            System.out.println("partition = " + record.partition());            System.out.println("key = " + record.key());            System.out.println("value = " + record.value());          //手动同步提交          consumer.commitSync();        }    }}


推荐阅读