springcloud微服务架构开发实战:分布式消息总线( 二 )


消息总线常见的实现方式《分布式系统常用技术及案例分析》一书列举了非常多的流行的、开源的分布式消息服务,如Apache ActiveMQ、RabbitMQ、Apache RocketMQ、Apache Kafka等 。这些消息中间件都实现了点对点模式及订阅/发布模式等常见的消息模式 。
以下例子演示的是使用ActiveMQ实现生产者—消费者的JAVA实现方式 。
生产者程序Producer.java:
public class Producer{private static final Logger LOGGER=LoggerFactory.getLogger (Producer.class);private static final string BROKER_URE = ActiveMQConnection.DEFAULT_BROKER URL;private static final String SUBJECT= "waylau-queue";public static void main (String[] args) throws JMSException f//初始化连接工厂ConnectionFactory connectionFactory= new ActiveMQConnectionFactory(BROKER_URL);//获得连接Connection conn = connectionFactory.createConnection();//启动连接conn.start(;//创建session,第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建队列Destination dest = session.createQueue(SUBJECT);//createTopic方法用来创建Topic//session.createTopic ("TOPIC");//通过session 可以创建消息的生产者MessageProducer producer = session.createProducer(dest);for(int i=0;i<100;i++){//初始化一个MQ消息TextMessage message= session.createTextMessage ("Welcome towaylau.com"+i);//发送消息producer. send(message);LOGGER.info("send message {}",i);//关闭 MQ 连接conn.close();}}消费者程序Consumer.java:
public class Consumer implements MessageListener {private static finalLogger LOGGER = LoggerFactory.getLogger(Consumer.class);private static final String BROKER_URL = ActiveMQConnection.DEFAULTBROKER URL;private static final string SUBJECT = "waylau-queue";public static void main(String[] args) throws JMSExceptionf//初始化 ConnectionFactoryConnectionFactory connectionFactory =new ActiveMOConnectionFactory(BROKER_URL);//创建Mo连接Connection conn = connectionFactory.createConnection();//启动连接conn .start(;//创建会话Session session= conn.createSession (false,Session.AUTO_ACKNOWLEDGE);//通过会话创建目标Destination dest = session.createQueue(SUBJECT);//创建 MO 消息的消费者MessageConsumer consumer = session.createConsumer(dest);//初始化 MessageListenerconsumer me=newConsumer();//给消费者设定监听对象consumer .setMessageListener (me);@overridepublic void onMessage(Message message){TextMessage txtMessage =(TextMessage)message;try{LOGGER.info("get message " + txtMessage.getText());}catch (JMSException e) {LOGGER.error("error {}",e));}}执行命令来启动ActiveMQa:
bin/activemg start生产者执行如下命令:
mvn clean compile exec:java -Dexec.mainClass=com.waylau.activemq.ProducerApp输出如下 。
20:12:10.807 [ActiveMQ Task-1]INEO org.apache.activemq.transport.failover.FailoverTransport- Successfully connected to tcp://localhost:6161620:12:10.928[main] INFOcom.waylau.activemq.Producer- send message 020:12:10.963 [main] INPO com.waylau.activemq.Producer- send message 120:12:10.992 [main] INFO com.waylau.activemq.Producer - send message 220:12:11.019[main] INFO com.waylau.activemq.Producer - send message 320:12:11.036[main] INFOcom.waylau.activemq.Producer- send message 420:12:11.058 [main] INFO com.waylau.activemq.producer -send message 520:12:11.085[main] INFOcom.waylau.activemq.Producer - send message620:12:11.113 [main] INFOcom.waylau.activemq.Producer - send message 720:12:11.141[main] INFOcom.waylau.activemq.Producer - send message 820:12:11.191 [main] INFO com.waylau.activemq.Producer- send message 9消费者执行如下命令:
mvn clean compile exec:java-Dexec.mainClass=com.waylau.activemq. ConsumerApp输出如下 。
20:12:05.262[ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport- Successfully connected to tcp://localhost:6161620:12:10.875 [ActiveMQ Session Task-1] INEOcom.waylau.activemg.Consumer -get message welcome to waylau.com o20:12:10.939 [ActiveMQ Session Task-1]INFO com.waylau.activemq.Consumer-get message welcome to waylau.com 120:12:10.965 [ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-get message Welcome to waylau.com 220:12:10.994 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer -get message Welcome to waylau .com 320:12:11.020 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer-get message Welcome to waylau.com 420:12:11.038 [ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-get message Welcome to waylau.com 520:12:11.059 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer -get message Welcome to waylau.com620:12:11.086[ActiveMQ Session Task-1] INEO com.waylau.activemq. Consumer-get message welcome to waylau.com 720:12:11.114[ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-get message Welcome to waylau.com 820:12:11.142 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer-get message Welcome to waylau.com 9


推荐阅读