1.第一个kafka程序1.1.创建我们的主题kafka-topics.bat --zookeeper localhost:2181/kafka --create --topic hello-kafka --replication-factor 1 --partitions 4
(主题不创建,可能会造成程序报错,也可在程序中配置如:
spring.kafka.listener.missing-topics-fatal=false
)
1.2.生产者发送消息引入jar:
<dependency>
<groupId>org.Apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
生产者代码示例:
文章插图
1.2.1.必选属性创建生产者对象时有三个属性必须指定 。
【第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题】bootstrap.servers
该属性指定 broker 的地址清单,地址的格式为 host:port 。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查询其他 broker 的信息 。
不过最少提供 2 个 broker 的信息(用逗号分隔,比如: 127.0.0.1:9092,192.168.0.13:9092),一旦其中一个宕机,生产者仍能连接到集群上 。
key.serializer
生产者接口允许使用参数化类型,可以把 JAVA 对象作为键和值传 broker,但是 broker 希望收到的消息的键和值都是字节数组,所以,必须提供将对象序列化成字节数组的序列化器 。key.serializer 必须设置为实现
org.apache.kafka.common.serialization.Serializer 的接口类,Kafka 的客户端默认提供了ByteArraySerializer,IntegerSerializer, StringSerializer,也可以实现自定义的序列化器 。
value.serializer
同 key.serializer 。
1.3.消费者接受消息代码示例:
文章插图
1.3.1.必选参数bootstrap.servers、key.serializer、value.serializer 含义同生产者
group.id
并非完全必需,它指定了消费者属于哪一个群组,但是创建不属于任何一个群组的消费者并没有问题 。
新版本特点:poll(Duration)这个版本修改了这样的设计,会把元数据获取也计入整个超时时间(更加的合理)
1.4.演示示例1.默认创建主题,只有一个分区时,演示生产者和消费者情况 。
2.修改主题分区为 2(使用管理命令),再重新演示生产者和消费者情况 。
2.Kafka 的生产者2.1.生产者发送消息的基本流程
文章插图
从创建一个 ProducerRecord 对象开始,Producer Record 对象需要包含目标主题和要发送的内容 。我们还可以指定键或分区 。在发送 ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输 。
接下来,数据被传给分区器 。如果之前在 Producer Record 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回 。如果没有指定分区,那么分区器会根据 Producer Record 对象的键来选择一个分区 。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了 。紧接着,这条记录被添加到一个记录批次里(双端队列,尾部写入),这个批次里的所有消息会被发送到相同的主题和分区上 。有一个独立的线程负责把这些记录批次发送到相应的 broker 上 。
服务器在收到这些消息时会返回一个响应 。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量 。如果写入失败,则会返回一个错误 。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息 。
生产者发送消息一般会发生两类错误:
一类是可重试错误,比如连接错误(可通过再次建立连接解决)、无主 no leader(可通过分区重新选举首领解决) 。
另一类是无法通过重试解决,比如“消息太大”异常,具体见 message.max.bytes,这类消息不会进行任何重试,直接抛出异常
2.2.Kafka 三种发送方式我们通过生成者的 send 方法进行发送 。send 方法会返回一个包含 RecordMetadata 的 Future 对象 。RecordMetadata 里包含了目标主题,分区信息和消息的偏移量 。
2.2.1.发送并忘记忽略 send 方法的返回值,不做任何处理 。大多数情况下,消息会正常到达,而且生产者会自动重试,但有时会丢失消息 。
2.2.2.同步发送获得 send 方法返回的 Future 对象,在合适的时候调用 Future 的 get 方法 。
推荐阅读
- 程序员遭遇:一觉睡来7个未接电话,到公司时发现已被踢出群
- 程序员为什么一定要进大公司,除了薪资,这些才是决定性因素
- 某程序员跳槽到银行9天后辞职,晒出技术水平后留言:太落后了
- 成都与盖碗茶,饮盖碗茶有五道程序先容盖碗茶品饮程序
- 高级程序员到底长什么样子?
- 第一个登月的地球人是谁?
- 程序员工作必备:10个超实用的GitHub库
- 应用程序加固Tomcat篇
- 公众号小程序有什么用?
- 程序员用Python实现自动化控制键盘和鼠标