摘要:本篇文章主要讲述JAVA语言之解读消息中间件-ActiveMQ,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。
本篇文章主要讲述JAVA语言之解读消息中间件-ActiveMQ,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。
文章大纲
一、消息中间件基础知识
二、ActiveMQ介绍
三、ActiveMQ下载安装(Windows版本)
四、Java操作ActiveMQ代码实战
五、Spring整合ActiveMQ代码实战
https://www.cnblogs.com/WUXIAOCHANG/p/10904987.html
1. 消息传递模型
1.1 点对点(point-to-point,简称PTP)Queue消息传递模型
通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。
1.2 发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型
通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。
1.3 两种模型方式比较
2. ActiveMQ消息格式
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
(1)StreamMessage -- Java原始值的数据流
(2)MapMessage--一套名称-值对
(3)TextMessage--一个字符串对象
(4)ObjectMessage--一个序列化的 Java对象
(5)BytesMessage--一个字节的数据流
1、打开浏览器,访问网址activemq.apache.org;
2、下载最新的版本,当前最新版本为5.15.5,根据ActiveMQ需要安装的操作系统选择性下载对应的版本,这里我选择Windows版本,然后点击下载ZIP包;
3、下载完成以后,将zip文件解压到D盘下;
4、在启动ActiveMQ前,首先要确保服务器上已经安装和配置好JDK,并且JDK的版本要满足ActiveMQ的要求;
5、接下来我们进入到D:\apache-activemq-5.15.5\bin;
6、根据服务器上操作系统的版本,选择进入到win32还是win64,这里选择进入win64目录,然后双击activemq.bat,这时activemq就启动起来了;
7、打开浏览器,输入//localhost:8161/admin/ ,弹出一个windows安全提示框,提示输入activemq的用户名和密码;
8、接下来我们打开D:\apache-activemq-5.15.5\conf这个目录,找到jetty-realm.properties文件(该文件保存着用户名和密码信息);
9、打开该文件,找到文件的末尾,格式是 用户名: 密码,用户角色;
10、角色信息的定义放在D:\apache-activemq-5.15.5\conf下的jetty.xml文件中;
11、 我们知道了角色定义的位置,角色对应的用户名和密码后,我们就可以使用默认的用户名admin和默认的密码admin来登录系统;
12、 登录成功以后,就可以看到activemq的主页了;
1. 新建服务端项目activemq-service
1.1 idea创建maven项目
1.2 pom.xml文件添加依赖
4.0.0 com.wxc activemq-service 1.0-SNAPSHOT junit junit 4.12 test--> org.apache.activemq activemq-client 5.15.0
1.3 创建测试类
com.wxc.test包下新建TestService.java
其中testQueueProducer方法为队列类型,testTopicProducer方法为发布/订阅类型,其中创建步骤如下:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory;import org.junit.Test; import javax.jms.*; public class TestService { @Test public void testQueueProducer() throws Exception { // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 //brokerURL服务器的ip及端口号 //8161是后台管理系统,61616是给java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:使用ConnectionFactory对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启连接,调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象创建一个Session对象。 //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。 //参数:队列的名称。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象创建一个Producer对象。 MessageProducer producer = session.createProducer(queue); // 第七步:创建一个Message对象,创建一个TextMessage对象。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test."); // 第八步:使用Producer对象发送消息。 producer.send(textMessage); // 第九步:关闭资源。 producer.close(); session.close(); connection.close(); } @Test public void testTopicProducer() throws Exception { // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 // brokerURL服务器的ip及端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:使用ConnectionFactory对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启连接,调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象创建一个Session对象。 // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。 // 参数:话题的名称。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session对象创建一个Producer对象。 MessageProducer producer = session.createProducer(topic); // 第七步:创建一个Message对象,创建一个TextMessage对象。 /* * TextMessage message = new ActiveMQTextMessage(); message.setText( * "hello activeMq,this is my first test."); */ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test"); // 第八步:使用Producer对象发送消息。 producer.send(textMessage); // 第九步:关闭资源。 producer.close(); session.close(); connection.close(); } }
温馨提示:8161端口是后台管理系统,61616端口是给java用的tcp端口
2. 新建消费端项目activemq-customer
2.1 idea创建maven项目
2.2 pom.xml文件添加依赖
4.0.0 com.wxc activemq-customer 1.0-SNAPSHOT org.apache.maven.plugins maven-compiler-plugin 6 6 junit junit 4.12 test--> org.apache.activemq activemq-client 5.15.0
2.3 新建测试类
com.wxc.test包下新建TestCustomer.java
testQueueConsumer方法是测试队列方式,testTopicConsumer方法是测试发布/订阅方式,创建步骤如下:
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory;import org.junit.Test; import javax.jms.*; public class TestCustomer { @Test public void testQueueConsumer() throws Exception { // 第一步:创建一个ConnectionFactory对象。 //8161是后台管理系统,61616是给java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:从ConnectionFactory对象中获得一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启连接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象创建一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象创建一个Consumer对象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; //取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待键盘输入 System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); } @Test public void testTopicConsumer() throws Exception { // 第一步:创建一个ConnectionFactory对象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:从ConnectionFactory对象中获得一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启连接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象创建一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session对象创建一个Consumer对象。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消费端03。。。。。"); // 等待键盘输入 System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); } }
3. 新建消费端项目activemq-customer2
3.1 idea创建maven项目
3.2 pom.xml文件添加依赖
4.0.0 com.wxc activemq-customer2 1.0-SNAPSHOT org.apache.maven.plugins maven-compiler-plugin 6 6 junit junit 4.12 test--> org.apache.activemq activemq-client 5.15.0
3.3 新建测试类
com.wxc.test包下新建TestCustomer.java
testQueueConsumer方法是测试队列方式,testTopicConsumer方法是测试发布/订阅方式,创建步骤如下:
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory;import org.junit.Test; import javax.jms.*; public class TestCustomer { @Test public void testQueueConsumer() throws Exception { // 第一步:创建一个ConnectionFactory对象。 //8161是后台管理系统,61616是给java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:从ConnectionFactory对象中获得一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启连接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象创建一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象创建一个Consumer对象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; //取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待键盘输入 System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); } @Test public void testTopicConsumer() throws Exception { // 第一步:创建一个ConnectionFactory对象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:从ConnectionFactory对象中获得一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启连接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象创建一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session对象创建一个Consumer对象。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消费端03。。。。。"); // 等待键盘输入 System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); } }
4. 代码说明
(1)创建了两个客户端的连接,是用于测试过程中体现队列方式只能被一个消费者接收,而发布/订阅方式可以被多个消费者同时收到
(2)8161端口是后台管理系统,61616端口是给java用的tcp端口
5.项目运行
5.1 队列方式
运行两个消费者端
运行服务者端
数据结果如下:
所以验证了队列方式只能有一个消费者端接收得到,且当客户端未运行时,服务器已经发送信息了,那么ActivieMQ会在客户端启动时候,传送数据给它
5.2 发布/订阅方式
运行两个消费者端
运行服务者端
数据结果如下:
所以验证了发布/订阅方式可以多个消费者端接收得到,且当客户端未运行时,服务器已经发送信息了,那么ActivieMQ会在客户端启动时候,传送数据给它
1. 使用方法
第一步:引用相关的jar包
org.springframework spring-jms org.springframework spring-context-support
第二步:配置Activemq整合spring。配置ConnectionFactory
第三步:配置生产者。
使用JMSTemplate对象。发送消息。
第四步:在spring容器中配置Destination
spring-queue
2. 代码测试
2.1 发送消息
第一步:初始化一个spring容器
第二步:从容器中获得JMSTemplate对象。
第三步:从容器中获得一个Destination对象
第四步:使用JMSTemplate对象发送消息,需要知道Destination
@Test public void testQueueProducer() throws Exception { // 第一步:初始化一个spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); // 第二步:从容器中获得JMSTemplate对象。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); // 第三步:从容器中获得一个Destination对象 Queue queue = (Queue) applicationContext.getBean("queueDestination"); // 第四步:使用JMSTemplate对象发送消息,需要知道Destination jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("spring activemq test"); return textMessage; } }); }
2.2 接收消息
Taotao-search-Service中接收消息。
第一步:把Activemq相关的jar包添加到工程中
第二步:创建一个MessageListener的实现类。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息内容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
第三步:配置spring和Activemq整合。
spring-queue
第四步:测试代码。
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
本文由职坐标整理发布,学习更多的相关知识,请关注职坐标IT知识库!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号