RabbitMQ之springboot操作

环境搭建// maven<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>//Applicationspring.application.name=rabbitmq-demoserver.port= 9099spring.rabbitmq.host=主机spring.rabbitmq.port=5672spring.rabbitmq.username=test-userspring.rabbitmq.password=123spring.rabbitmq.virtual-host=/test-v //虚拟主机代码操作直连工作模式// 生产者@RestControllerpublic class RabbitController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("createMQ")public void createMQ(){//队列名称 没有则创建发送消息信息rabbitTemplate.convertAndSend("test-q","hello");}}//消费者@Component//参数队列名称是否持久化是否自动删除@RabbitListener(queuesToDeclare = @Queue(value = https://www.isolves.com/it/cxkf/jiagou/2022-09-07/"test-q",durable = "true",autoDelete = "true"))//此处有默认值 可以只写队列名称,其它参数默认值为持久化 非独占 非自动删除public class CustomerMQ {@RabbitHandlerpublic void receive(String msg){System.out.println("msg is "+msg);}}work模型默认轮询该模型中消息队列只负责将消息按轮询发送至所有监听中的消费者, 与消费者的内部逻辑毫无关系
@GetMapping("workMQ")public void workMQ(){// 循环10for (int i = 0; i < 10; i++) {//队列名称没有则创建(随便起)发送消息内容rabbitTemplate.convertAndSend("work","hello");}}//work模型消费者监听类@Componentpublic class WorkCustomer { // 注意 此处 rabbitListener是作用在方法上了 这样编程更加容易@RabbitListener(queuesToDeclare = @Queue("work"))public void receive(String msg) throws InterruptedException {//此处模拟消费者1执行的慢 消费者2 执行的快//很显然执行结果与消费者执行快慢没有联系 是每消费者执行5条Thread.sleep(2000);System.out.println("receive1 msg is "+msg);}@RabbitListener(queuesToDeclare = @Queue("work"))public void receive2(String msg){System.out.println("receive2 msg is "+msg);}}按劳分配顾名思义,此策略下的消息队列分配消息是与 消费者的逻辑有直接联系的, 消费者需要消费完该消息并 反馈消息队列 ,消息队列才会继续发送消息至该消费者 。
// 代码与上面一致 application.properties 添加一行 限制接收消息spring.rabbitmq.listener.simple.prefetch=1//即可广播模型(fanout)该场景是经典的广播模式,消费者的消息将会发送至所有 监听中的消费者
//生产者 接口@GetMapping("fanoutMQ")public void fanoutMQ(){for (int i = 0; i < 10; i++) {// 第一个参数交换机名称 与消费者对应即可//第二个参数 队列名称 此处使用临时队列 不需要值 第三参数 发送消息内容rabbitTemplate.convertAndSend("test-fanout","","hello");}}//消费者类@Componentpublic class FanoutCustomer {@RabbitListener(bindings = {@QueueBinding(value = https://www.isolves.com/it/cxkf/jiagou/2022-09-07/@Queue,// 代表使用的临时队列//value 交换机名称 与生产者对应type 交换机类型 fanoutexchange = @Exchange(value= "test-fanout",type = "fanout"))})public void receive(String msg) throws InterruptedException {Thread.sleep(2000);System.out.println("receive1 msg is "+msg);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,// 代表使用的临时队列//value 交换机名称 与生产者对应type 交换机类型 fanoutexchange = @Exchange(value= "test-fanout",type = "fanout"))})public void receive2(String msg){System.out.println("receive2 msg is "+msg);}}路由模型该模型使用固定的路由key 来选择发送消息至所有监听的消费者
// 生产者@GetMapping("directMQ")public void directMQ(){// 第一个参数交换机名称 与消费者对应即可 第二个参数 路由key 该参数决定发送至哪个消费者//第三参数 发送消息内容rabbitTemplate.convertAndSend("test-direct","key1","hello");}//消费者// 说明: 此处定义四个监听消费者一监听的key:key1 key2 key 3 二: key1 三: key2//四 :key3 /**此处消费者发送是的key1 很明显消息只可能被 消费者一和消费者二接收到*/@Componentpublic class DirectCustomer {@RabbitListener(bindings = {@QueueBinding(value = https://www.isolves.com/it/cxkf/jiagou/2022-09-07/@Queue,// 代表使用的临时队列//value 交换机名称 与生产者对应type 交换机类型 fanoutexchange = @Exchange(value= "test-direct",type = "direct") ,// 该参数默认类型就是directkey = { "key1","key2","key3"})})public void receive(String msg) throws InterruptedException {System.out.println("receive1 msg is "+msg);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,// 代表使用的临时队列//value 交换机名称 与生产者对应type 交换机类型 fanoutexchange = @Exchange(value= "test-direct",type = "direct") ,// 该参数默认类型就是directkey = { "key1"})})public void receive2(String msg){System.out.println("receive2 msg is "+msg);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,// 代表使用的临时队列//value 交换机名称 与生产者对应type 交换机类型 fanoutexchange = @Exchange(value= "test-direct",type = "direct") ,// 该参数默认类型就是directkey = { "key2"})})public void receive3(String msg) throws InterruptedException {Thread.sleep(2000);System.out.println("receive1 msg is "+msg);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,// 代表使用的临时队列//value 交换机名称 与生产者对应type 交换机类型 fanoutexchange = @Exchange(value= "test-direct",type = "direct") ,// 该参数默认类型就是directkey = { "key3"})})public void receive4(String msg){System.out.println("receive2 msg is "+msg);}}


推荐阅读