SpringBoot整合RocketMQ,老鸟们都是这么玩的!


SpringBoot整合RocketMQ,老鸟们都是这么玩的!

文章插图
今天我们来讨论如何在项目开发中优雅地使用RocketMQ 。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决他们,第三部分介绍如何封装RocketMQ以便更好地使用 。
1. SpringBoot整合RocketMQ在SpringBoot中集成RocketMQ,只需要简单四步:
  1. 引入相关依赖
<dependency><groupId>org.Apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>
  1. 添加RocketMQ的相关配置
 rocketmq:consumer:group: springboot_consumer_group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 10name-server: 10.5.103.6:9876producer:# 发送同一类消息的设置为同一个group,保证唯一group: springboot_producer_group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false
  1. 使用提供的模板工具类RocketMQTemplate发送消息
 @RestControllerpublic class NormalProduceController {@Setter(onMethod_ = @Autowired)private RocketMQTemplate rocketmqTemplate;@GetMApping("/test")public SendResult test() {Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();SendResult sendResult = rocketmqTemplate.send(topic, msg);}}
  1. 实现RocketMQListener接口消费消息
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name")public class MyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑System.out.println("Received message: " + message);}}以上4步即可实现SpringBoot与RocketMQ的整合,这部分属于基础知识,不做过多说明 。
2 使用RocketMQ会遇到的问题以下是一些在SpringBoot中使用RocketMQ时常遇到的问题,现在为您逐一解决 。
2.1 WARN No appenders could be found for logger启动项目时会在日志中看到如下告警
RocketMQLog:WARN No appenders could be found for logger (io.NETty.util.internal.InternalThreadLocalMap).RocketMQLog:WARN Please initialize the logger system properly.此时我们只需要在启动类中设置环境变量 rocketmq.client.logUseSlf4j 为 true 明确指定RocketMQ的日志框架
 @SpringBootApplicationpublic class RocketDemoApplication {public static void main(String[] args) {/** 指定使用的日志框架,否则将会告警* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).* RocketMQLog:WARN Please initialize the logger system properly.*/System.setProperty("rocketmq.client.logUseSlf4j", "true");SpringApplication.run(RocketDemoApplication.class, args);}}同时还得在配置文件中调整日志级别,不然在控制台会一直看到broker的日志信息
logging: level:RocketmqClient: ERRORio:netty: ERROR2.2 不支持LocalDate 和 LocalDateTime在使用JAVA8后经常会使用LocalDate/LocalDateTime这两个时间类型字段,然而RocketMQ原始配置并不支持Java时间类型,当我们发送的实体消息中包含上述两个字段时,消费端在消费时会出现如下所示的错误 。
比如生产者的代码如下:
 @GetMapping("/test")public void test(){//普通消息无返回值,只负责发送消息?不等待服务器回应且没有回调函数触发 。RocketMessage rocketMessage = RocketMessage.builder().id(1111L).message("hello,world").localDate(LocalDate.now()).localDateTime(LocalDateTime.now()).build();rocketmqTemplate.convertAndSend(destination,rocketMessage);}消费者的代码如下:
 @Component@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic")public class RocketMQConsumer implements RocketMQListener<RocketMessage> {@Overridepublic void onMessage(RocketMessage message) {System.out.println("消费消息-" + message);}}


推荐阅读