RabbitMq七种工作模式,结合简单的java实例使用,别再说你不会( 二 )

2、工作队列工作队列也就是简单模式的强化版,一个队列是可以多个生产者,也可以有多个消费者来竞争消费消息,但是我们仍需保证队列的幂等性,队列存在就不能再创建同名队列 。

RabbitMq七种工作模式,结合简单的java实例使用,别再说你不会

文章插图
 
下面的每个进程都控制其主线程休眠,让我们可以更好的看到结果 。
2.1.1、Sender1.javaimport com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import top.san.RabbitMq.util.RabbitmqConnectionUtil;import java.io.IOException;public class Sender1 {private finalstatic String QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = RabbitmqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);for(int i = 0; i < 100; i++){String message = "lbw" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '"+message + "'");Thread.sleep(i*10);}channel.close();connection.close();}}2.1.2、Sender2.javaimport com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import top.san.RabbitMq.util.RabbitmqConnectionUtil;import java.io.IOException;public class Sender2 {private finalstatic String QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = RabbitmqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);for(int i = 0; i < 100; i++){String message = "nb" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '"+message + "'");Thread.sleep(i*10);}channel.close();connection.close();}}2.1.3、Receiver1.javaimport com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;import top.san.RabbitMq.util.RabbitmqConnectionUtil;import java.io.IOException;/** * Created by san */public class Receiver1 {private final staticString QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = RabbitmqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false,false, false,null);//同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);//关于手工确认 待之后有时间研究下channel.basicConsume(QUEUE_NAME, false, consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println("[x] Received1 '"+message+"'");Thread.sleep(10);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}2.1.4、Receiver2.javaimport com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;import top.san.RabbitMq.util.RabbitmqConnectionUtil;import java.io.IOException;/** * Created by san */public class Receiver2 {private final staticString QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = RabbitmqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false,false, false,null);//同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, false, consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println("[x] Received2 '"+message+"'");Thread.sleep(1000);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}2.1.5、结果上面的四个程序都运行起来,结果可以看到如下,依据结果分析,可知,同一个消息队列,是可以有多个生产者和消费者的 。
RabbitMq七种工作模式,结合简单的java实例使用,别再说你不会

文章插图
 

RabbitMq七种工作模式,结合简单的java实例使用,别再说你不会

文章插图
 

RabbitMq七种工作模式,结合简单的java实例使用,别再说你不会

文章插图
 

RabbitMq七种工作模式,结合简单的java实例使用,别再说你不会

文章插图
 
3、发布/订阅(fanout)
RabbitMq七种工作模式,结合简单的java实例使用,别再说你不会

文章插图
 
3.1.1、Sender.javaimport com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import top.san.RabbitMq.util.RabbitmqConnectionUtil;public class Sender {private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args){try{//获取连接Connection connection = RabbitmqConnectionUtil.getConnection();//从连接中获取一个通道Channel channel = connection.createChannel();//声明交换机(分发:发布/订阅模式)channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//发送消息for (int i = 0; i < 5; i++){String message = "卢本伟广场" + i;System.out.println("[send]:" + message);//发送消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));Thread.sleep(5 * i);}channel.close();connection.close();}catch (Exception e){e.printStackTrace();}}}


推荐阅读