10分钟带你逆袭Kafka!( 十 )

 
定义生产者发送成功的回调函数: 
import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.RecordMetadata;/** * @ClassName MyCallback * @Description TODO * @Author lingxiangxiang * @Date 3:51 PM * @Version 1.0 **/public class MyCallback implements Callback {    private Object msg;    public MyCallback(Object msg) {        this.msg = msg;    }    @Override    public void onCompletion(RecordMetadata metadata, Exception e) {        System.out.println("topic = " + metadata.topic());        System.out.println("partiton = " + metadata.partition());        System.out.println("offset = " + metadata.offset());        System.out.println(msg);    }} 
生产者测试类:在生产者测试类中,自己遇到一个坑,就是最后自己没有加 sleep,就是怎么检查自己的代码都没有问题,但是最后就是没法发送成功消息,最后加了一个 sleep 就可以了 。 因为主函数 main 已经执行完退出,但是消息并没有发送完成,需要进行等待一下 。当然,你在生产环境中可能不会遇到这样问题,呵呵! 代码如下: 
import static java.lang.Thread.sleep;/** * @ClassName MyKafkaProducerTest * @Description TODO * @Author lingxiangxiang * @Date 3:46 PM * @Version 1.0 **/public class MyKafkaProducerTest {    public static void main(String[] args) throws InterruptedException {        MyKafkaProducer producer = new MyKafkaProducer();        boolean result = producer.sendMsg();        System.out.println("send msg " + result);        sleep(1000);    }} 
消费者类: 
import kafka.utils.ShutdownableThread;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Collections;import java.util.Properties;/** * @ClassName MyKafkaConsumer * @Description TODO * @Author lingxiangxiang * @Date 4:12 PM * @Version 1.0 **/public class MyKafkaConsumer extends ShutdownableThread {    private KafkaConsumer<Integer, String> consumer;    public MyKafkaConsumer() {        super("KafkaConsumerTest", false);        Properties properties = new Properties();        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");        properties.put("group.id", "mygroup");        properties.put("enable.auto.commit", "true");        properties.put("auto.commit.interval.ms", "1000");        properties.put("session.timeout.ms", "30000");        properties.put("heartbeat.interval.ms", "10000");        properties.put("auto.offset.reset", "earliest");        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        this.consumer = new KafkaConsumer<Integer, String>(properties);    }    @Override    public void doWork() {        consumer.subscribe(Arrays.asList("test2"));        ConsumerRecords<Integer, String>records = consumer.poll(1000);        for (ConsumerRecord record : records) {            System.out.println("topic = " + record.topic());            System.out.println("partition = " + record.partition());            System.out.println("key = " + record.key());            System.out.println("value = " + record.value());        }    }}


推荐阅读