RabbitMQ发送和接收消息的几种方式


channel.basicQos(0, 1, false):0表示对消息的大小无限制,1表示每次只允许消费一条,false表示该限制不作用于channel 。同时,我们采用手工ACK的方式,因为我们配置文件配置了 spring.rabbitmq.listener.simple.acknowledge-mode=manual 。一、发送消息的几种方式1.1、默认交换机和routingKey----(个人不推荐使用)    使用默认的交换机exchange或routingKey 。

RabbitMQ发送和接收消息的几种方式

文章插图
图片
调用方法:
RabbitMQ发送和接收消息的几种方式

文章插图
图片
1.2、使用指定routingKey的方式发送(默认的交换机)    使用默认的交换机,routingKey必须为quenue队列的名称 。
调用方法:
RabbitMQ发送和接收消息的几种方式

文章插图
图片
案例:
/** * @Author yangyalin * @Description 测试发送消息(直接使用队列发送,使用默认的交换机) routingKey:即为对列的名称即可 **/public void testSendMsg(String message){rabbitTemplate.convertAndSend(RabbitMQConvertConfig.TEST_QUEUE,message);}1.3、指定交换机和routingKey的方式发送    使用指定的交换机,若绑定routingKey,必须使用指定的模式;若没有绑定,可设置为"" 。
调用方法:
RabbitMQ发送和接收消息的几种方式

文章插图
图片
案例:public void sendDecreStockMessage(DecreStockFromRabbit decreStockFromRabbit){CorrelationData correlationData = https://www.isolves.com/it/wl/sz/2023-11-08/new CorrelationData();correlationData.setId(decreStockFromRabbit.getMessageId());/*** exchange:交换机routingKey:路由键message:消息体内容correlationData:消息唯一ID**/rabbitTemplate.convertAndSend(RabbitMQConvertConfig.ORDER_EXCHANGE,RabbitMQConvertConfig.ORDER_ROUTINGKEY, decreStockFromRabbit,correlationData);}或:rabbitTemplate.convertAndSend("test-exchange","",message);二、接收消息的几种方式2.1、默认交换机,提前创建好队列(TestDirectQueue)/*** 功能描述:当消费同一个队列的时候 , 可通过设置实现能则多劳,* 消息轮询方式订阅* @MethodName: process11* @MethodParam: [testMessage]* @Return: void* @Author: yyalin* @CreateDate: 2022/4/9 17:10*/@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueuepublic void process11(Map testMessage) throws InterruptedException {log.info("消费者收到消息222:" + testMessage.toString());Thread.sleep(200);}2.2、默认交换机,自动创建队列(TEST_QUEUE2)@RabbitListener(queuesToDeclare=@Queue(TopicExchangeConfig.TEST_QUEUE2))@RabbitHandlerpublic void receiveTestMsg2(@Payload String str) throws Exception{log.info("开始接收消息 。。。。。");log.info("接收到的消息:"+str);}2.3、自动创建交换机和队列----(个人推荐)    自动创建且交换机和队列绑定,key可指定也可不指定(默认为队列名称) 。
/******************方案二:使用注解的方式绑定队列在交换机上*******************/@RabbitListener(bindings = @QueueBinding(value=https://www.isolves.com/it/wl/sz/2023-11-08/@Queue(name="directQueue"),exchange=@Exchange(name="directExchange",type = ExchangeTypes.DIRECT),key={"red", "blue"}))//监听的队列名称 TestDirectQueuepublic void directConsumer(String message) {log.info("消费者收到direct消息555: " + message);}@RabbitListener(bindings = @QueueBinding(value=@Queue(name="topicQueue2"),exchange=@Exchange(name="topicExchange",type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true"),key="#.new"))public void topicConsumer2(String message) {log.info("消费者收到topic消息888: " + message);}备注:ignoreDeclarationExceptions = "true" : 即使配置出现了错误也不至于整个应用程序都启动失败的情况 。
【RabbitMQ发送和接收消息的几种方式】1、channel.basicQos(0, 1, false):0表示对消息的大小无限制,1表示每次只允许消费一条,false表示该限制不作用于channel 。
同时,我们采用手工ACK的方式,因为我们配置文件配置了   spring.rabbitmq.listener.simple.acknowledge-mode=manual:
2、channel.basicAck(deliveryTag, false):deliveryTag表示处理的消息条数(一般为1) , 从heaers中取,false表示不批量ack 。
/*** 功能描述: 消费端加上手动确认消息被接收* @MethodName: process* @MethodParam: [message]* @Return: void* @Author: yyalin* @CreateDate: 2022/4/18 19:10*/@RabbitListener(queues = "TestDirectQueue3")//监听的队列名称 TestDirectQueuepublic void process(String message, Channel channel) throws IOException {log.info("DirectReceiver消费者收到消息1: " + message);long msgId=1111L; //消息IDtry {//手动确认消息已消费channel.basicAck(msgId,false);} catch (IOException e) {//把消息失败的消息重新放入到队列channel.basicNack(msgId,false,true);e.printStackTrace();}}


推荐阅读