RabbitMQ——简单队列

RabbitMQ 简述#
RabbitMQ是一个消息代理:它接受并转发消息 。您可以将其视为邮局:当您将要把寄发的邮件投递到邮箱中时,您可以确信Postman 先生最终会将邮件发送给收件人 。在这个比喻中,RabbitMQ是一个邮箱,邮局和邮递员,用来接受,存储和转发二进制数据块的消息 。
队列就像是在RabbitMQ中扮演邮箱的角色 。虽然消息经过RabbitMQ和应用程序,但它们只能存储在队列中 。队列只受主机的内存和磁盘限制的限制,它本质上是一个大的消息缓冲区 。许多生产者可以发送到一个队列的消息,许多消费者可以尝试从一个队列接收数据 。
producer即为生产者,用来产生消息发送给队列 。consumer是消费者,需要去读队列内的消息 。producer,consumer和broker(rabbitMQ server)不必驻留在同一个主机上;确实在大多数应用程序中它们是这样分布的 。
简单队列#简单队列是最简单的一种模式,由生产者、队列、消费者组成 。生产者将消息发送给队列,消费者从队列中读取消息完成消费 。
在下图中,“P”是我们的生产者,“C”是我们的消费者 。中间的框是队列 - RabbitMQ代表消费者的消息缓冲区 。

RabbitMQ——简单队列

文章插图
 
JAVA 方式#生产者#Copypackage com.anqi.mq.nat;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class MyProducer {private static final String QUEUE_NAME = "ITEM_QUEUE";public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");//2. 通过连接工厂来创建连接Connection connection = factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel = connection.createChannel();//实际场景中,消息多为json格式的对象String msg = "hello";//4. 发送三条数据for (int i = 1; i <= 3 ; i++) {channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("Send message" + i +" : " + msg);}//5. 关闭连接channel.close();connection.close();}}Copy/*** Declare a queue* @param queue the name of the queue* @param durable true if we are declaring a durable queue (the queue will survive a server restart)* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)* @param arguments other properties (construction arguments) for the queue* @return a declaration-confirm method to indicate the queue was successfully declared* @throws java.io.IOException if an error is encountered*/Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;/*** Publish a message* @see com.rabbitmq.client.AMQP.Basic.Publish* @param exchange the exchange to publish the message to* @param routingKey the routing key* @param props other properties for the message - routing headers etc* @param body the message body* @throws java.io.IOException if an error is encountered*/void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;/*** Start a non-nolocal, non-exclusive consumer, with* a server-generated consumerTag.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param callback an interface to the consumer object* @return the consumerTag generated by the server* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)*/String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;消费者#Copypackage com.anqi.mq.nat;import com.rabbitmq.client.*;import java.io.IOException;public class MyConsumer {private static final String QUEUE_NAME = "ITEM_QUEUE";public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");//2. 通过连接工厂来创建连接Connection connection = factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel = connection.createChannel();//4. 声明一个队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");/*true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈 。*///5. 创建消费者并接收消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};//6. 设置 Channel 消费者绑定队列channel.basicConsume(QUEUE_NAME, true, consumer);}}


推荐阅读