
文章插图
今天我们来讨论如何在项目开发中优雅地使用RocketMQ 。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决他们,第三部分介绍如何封装RocketMQ以便更好地使用 。
1. SpringBoot整合RocketMQ在SpringBoot中集成RocketMQ,只需要简单四步:
- 引入相关依赖
<dependency><groupId>org.Apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>
- 添加RocketMQ的相关配置
rocketmq:consumer:group: springboot_consumer_group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 10name-server: 10.5.103.6:9876producer:# 发送同一类消息的设置为同一个group,保证唯一group: springboot_producer_group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false
- 使用提供的模板工具类RocketMQTemplate发送消息
@RestControllerpublic class NormalProduceController {@Setter(onMethod_ = @Autowired)private RocketMQTemplate rocketmqTemplate;@GetMApping("/test")public SendResult test() {Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();SendResult sendResult = rocketmqTemplate.send(topic, msg);}}
- 实现RocketMQListener接口消费消息
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name")public class MyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑System.out.println("Received message: " + message);}}
以上4步即可实现SpringBoot与RocketMQ的整合,这部分属于基础知识,不做过多说明 。2 使用RocketMQ会遇到的问题以下是一些在SpringBoot中使用RocketMQ时常遇到的问题,现在为您逐一解决 。
2.1 WARN No appenders could be found for logger启动项目时会在日志中看到如下告警
RocketMQLog:WARN No appenders could be found for logger (io.NETty.util.internal.InternalThreadLocalMap).RocketMQLog:WARN Please initialize the logger system properly.
此时我们只需要在启动类中设置环境变量 rocketmq.client.logUseSlf4j 为 true 明确指定RocketMQ的日志框架@SpringBootApplicationpublic class RocketDemoApplication {public static void main(String[] args) {/** 指定使用的日志框架,否则将会告警* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).* RocketMQLog:WARN Please initialize the logger system properly.*/System.setProperty("rocketmq.client.logUseSlf4j", "true");SpringApplication.run(RocketDemoApplication.class, args);}}
同时还得在配置文件中调整日志级别,不然在控制台会一直看到broker的日志信息logging: level:RocketmqClient: ERRORio:netty: ERROR
2.2 不支持LocalDate 和 LocalDateTime在使用JAVA8后经常会使用LocalDate/LocalDateTime这两个时间类型字段,然而RocketMQ原始配置并不支持Java时间类型,当我们发送的实体消息中包含上述两个字段时,消费端在消费时会出现如下所示的错误 。比如生产者的代码如下:
@GetMapping("/test")public void test(){//普通消息无返回值,只负责发送消息?不等待服务器回应且没有回调函数触发 。RocketMessage rocketMessage = RocketMessage.builder().id(1111L).message("hello,world").localDate(LocalDate.now()).localDateTime(LocalDateTime.now()).build();rocketmqTemplate.convertAndSend(destination,rocketMessage);}
消费者的代码如下:@Component@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic")public class RocketMQConsumer implements RocketMQListener<RocketMessage> {@Overridepublic void onMessage(RocketMessage message) {System.out.println("消费消息-" + message);}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 在 SpringBoot 中使用 Spring AOP 实现接口鉴权
- SpringBoot中如何实现限流,这种方式才叫优雅!
- SpringBoot中使用PostgreSQL数据库
- SpringBoot对SpringMVC的自动配置,你知道多少?
- AI开发大一统:谷歌OpenXLA开源,整合所有框架和AI芯片
- SpringBoot启动控制台的banner是怎么回事
- SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
- 158资源整合网怎么样 创业网致富网3158
- 在分布式系统中,SpringBoot 实现接口幂等性
- coso内部控制框架?COSO内部控制与企业风险管理整合框架的比较?