新来个技术总监,把 RabbitMQ 讲得那叫一个透彻,佩服( 五 )


System.out.println(" [x] Sent '" + message + "'");
}
//关闭消息通道和连接
channel.close();
connection.close();
}
@Test
public void consumer() throws java.io.IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//创建连接
Connection connection = factory.newConnection();
//创建消息信道
final Channel channel = connection.createChannel();
//消息队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
执行send()后控制台输出:
[x] Sent 'Hello World RabbitMQ count: 0'
[x] Sent 'Hello World RabbitMQ count: 1'
[x] Sent 'Hello World RabbitMQ count: 2'
[x] Sent 'Hello World RabbitMQ count: 3'
[x] Sent 'Hello World RabbitMQ count: 4'
[x] Sent 'Hello World RabbitMQ count: 5'
[x] Sent 'Hello World RabbitMQ count: 6'
[x] Sent 'Hello World RabbitMQ count: 7'
[x] Sent 'Hello World RabbitMQ count: 8'
[x] Sent 'Hello World RabbitMQ count: 9'

新来个技术总监,把 RabbitMQ 讲得那叫一个透彻,佩服

文章插图
执行consumer()后:
新来个技术总监,把 RabbitMQ 讲得那叫一个透彻,佩服

文章插图
示例中的代码讲解,可以直接参考官网:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
5. 基本使用姿势5.1 公共代码封装封装工厂类:
public class RabbitUtil {
public static ConnectionFactory getConnectionFactory() {
//创建连接工程,下面给出的是默认的case
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}
}
封装生成者:
public class MsgProducer {
public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//创建连接
Connection connection = factory.newConnection();
//创建消息通道
Channel channel = connection.createChannel();
// 声明exchange中的消息为可持久化,不自动删除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 发布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
}
封装消费者:
public class MsgConsumer {
public static void consumerMsg(String exchange, String queue, String routingKey)
throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//创建连接
Connection connection = factory.newConnection();
//创建消息信道
final Channel channel = connection.createChannel();
//消息队列
channel.queueDeclare(queue, true, false, false, null);
//绑定队列到交换机
channel.queueBind(queue, exchange, routingKey);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 取消自动ack
channel.basicConsume(queue, false, consumer);
}
}
5.2 Direct方式
新来个技术总监,把 RabbitMQ 讲得那叫一个透彻,佩服

文章插图
5.2.1 Direct示例生产者:
public class DirectProducer {
private static final String EXCHANGE_NAME = "direct.exchange";


推荐阅读