2、工作队列工作队列也就是简单模式的强化版,一个队列是可以多个生产者,也可以有多个消费者来竞争消费消息,但是我们仍需保证队列的幂等性,队列存在就不能再创建同名队列 。
文章插图
下面的每个进程都控制其主线程休眠,让我们可以更好的看到结果 。
2.1.1、Sender1.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import top.san.RabbitMq.util.RabbitmqConnectionUtil;import java.io.IOException;public class Sender1 {private finalstatic String QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = RabbitmqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);for(int i = 0; i < 100; i++){String message = "lbw" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '"+message + "'");Thread.sleep(i*10);}channel.close();connection.close();}}
2.1.2、Sender2.javaimport com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import top.san.RabbitMq.util.RabbitmqConnectionUtil;import java.io.IOException;public class Sender2 {private finalstatic String QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = RabbitmqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);for(int i = 0; i < 100; i++){String message = "nb" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '"+message + "'");Thread.sleep(i*10);}channel.close();connection.close();}}
2.1.3、Receiver1.javaimport com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;import top.san.RabbitMq.util.RabbitmqConnectionUtil;import java.io.IOException;/** * Created by san */public class Receiver1 {private final staticString QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = RabbitmqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false,false, false,null);//同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);//关于手工确认 待之后有时间研究下channel.basicConsume(QUEUE_NAME, false, consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println("[x] Received1 '"+message+"'");Thread.sleep(10);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}
2.1.4、Receiver2.javaimport com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;import top.san.RabbitMq.util.RabbitmqConnectionUtil;import java.io.IOException;/** * Created by san */public class Receiver2 {private final staticString QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = RabbitmqConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false,false, false,null);//同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, false, consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println("[x] Received2 '"+message+"'");Thread.sleep(1000);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}
2.1.5、结果上面的四个程序都运行起来,结果可以看到如下,依据结果分析,可知,同一个消息队列,是可以有多个生产者和消费者的 。文章插图
文章插图
文章插图
文章插图
3、发布/订阅(fanout)
文章插图
3.1.1、Sender.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import top.san.RabbitMq.util.RabbitmqConnectionUtil;public class Sender {private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args){try{//获取连接Connection connection = RabbitmqConnectionUtil.getConnection();//从连接中获取一个通道Channel channel = connection.createChannel();//声明交换机(分发:发布/订阅模式)channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//发送消息for (int i = 0; i < 5; i++){String message = "卢本伟广场" + i;System.out.println("[send]:" + message);//发送消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));Thread.sleep(5 * i);}channel.close();connection.close();}catch (Exception e){e.printStackTrace();}}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 计算机专业学什么语言好找工作?
- 残疾人|本科没拿到学位证对以后找工作有多大影响?
- 鲜虾萝卜饼的做法
- 没有工作的一年|《没有工作的一年》赚钱还是躺平?小雨租房也乐观,却源于这一点
- 招聘|考研究生反而不好找工作,日本人这点很认真,大家觉得应该学吗?
- 教师|想上岸入编找到理想工作,该如何选择单位?随我一起来看看吧!
- 犀角化毒丸怎么样
- 戴尔Precision 5750移动工作站深度测试
- 教你快速清除Excel工作表保护密码
- excel批量拆分工作簿,用VBA一键拆分,把数据分解到N个工作簿