10分钟带你逆袭Kafka!(12)

 
③消费者异步手工提交 
手动同步提交方式需要等待 Broker 的成功响应,效率太低,影响消费者的吞吐量 。
 
异步提交方式是,消费者向 Broker 提交 Offset 后不用等待成功响应,所以其增加了消费者的吞吐量 。
 
import kafka.utils.ShutdownableThread;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Collections;import java.util.Properties;/** * @ClassName MyKafkaConsumer * @Description TODO * @Author lingxiangxiang * @Date 4:12 PM * @Version 1.0 **/public class MyKafkaConsumer extends ShutdownableThread {    private KafkaConsumer<Integer, String> consumer;    public MyKafkaConsumer() {        super("KafkaConsumerTest", false);        Properties properties = new Properties();        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");        properties.put("group.id", "mygroup");      // 这里要修改成手动提交        properties.put("enable.auto.commit", "false");        // properties.put("auto.commit.interval.ms", "1000");        properties.put("session.timeout.ms", "30000");        properties.put("heartbeat.interval.ms", "10000");        properties.put("auto.offset.reset", "earliest");        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        this.consumer = new KafkaConsumer<Integer, String>(properties);    }    @Override    public void doWork() {        consumer.subscribe(Arrays.asList("test2"));        ConsumerRecords<Integer, String>records = consumer.poll(1000);        for (ConsumerRecord record : records) {            System.out.println("topic = " + record.topic());            System.out.println("partition = " + record.partition());            System.out.println("key = " + record.key());            System.out.println("value = " + record.value());          //手动同步提交          // consumer.commitSync();          //手动异步提交          // consumer.commitAsync();          // 带回调公共的手动异步提交          consumer.commitAsync((offsets, e) -> {            if(e != null) {              System.out.println("提交次数, offsets = " + offsets);              System.out.println("exception = " + e);            }          });        }    }} 
Spring Boot 使用 Kafka
 
现在大家的开发过程中,很多都用的是 Spring Boot 的项目,直接启动了,如果还是用原生的 API,就是有点 Low 了啊,那 Kafka 是如何和 Spring Boot 进行联合的呢?


推荐阅读