JAVA语言之解读消息中间件-ActiveMQ
从安 2019-06-05 来源 : 阅读 1271 评论 0

摘要:本篇文章主要讲述JAVA语言之解读消息中间件-ActiveMQ,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。

本篇文章主要讲述JAVA语言之解读消息中间件-ActiveMQ,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。

JAVA语言之解读消息中间件-ActiveMQ

文章大纲

一、消息中间件基础知识
二、ActiveMQ介绍
三、ActiveMQ下载安装(Windows版本)
四、Java操作ActiveMQ代码实战
五、Spring整合ActiveMQ代码实战

一、消息中间件基础知识

https://www.cnblogs.com/WUXIAOCHANG/p/10904987.html

二、ActiveMQ介绍

1. 消息传递模型

1.1 点对点(point-to-point,简称PTP)Queue消息传递模型
  通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。

1.2 发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型
  通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。

1.3 两种模型方式比较

JAVA语言之解读消息中间件-ActiveMQ

 

2. ActiveMQ消息格式

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
(1)StreamMessage -- Java原始值的数据流
(2)MapMessage--一套名称-值对
(3)TextMessage--一个字符串对象
(4)ObjectMessage--一个序列化的 Java对象
(5)BytesMessage--一个字节的数据流

三、ActiveMQ下载安装(Windows版本)

1、打开浏览器,访问网址activemq.apache.org; 

2、下载最新的版本,当前最新版本为5.15.5,根据ActiveMQ需要安装的操作系统选择性下载对应的版本,这里我选择Windows版本,然后点击下载ZIP包; 

3、下载完成以后,将zip文件解压到D盘下;

4、在启动ActiveMQ前,首先要确保服务器上已经安装和配置好JDK,并且JDK的版本要满足ActiveMQ的要求;

5、接下来我们进入到D:\apache-activemq-5.15.5\bin; 

6、根据服务器上操作系统的版本,选择进入到win32还是win64,这里选择进入win64目录,然后双击activemq.bat,这时activemq就启动起来了;

7、打开浏览器,输入//localhost:8161/admin/ ,弹出一个windows安全提示框,提示输入activemq的用户名和密码; 

8、接下来我们打开D:\apache-activemq-5.15.5\conf这个目录,找到jetty-realm.properties文件(该文件保存着用户名和密码信息); 

9、打开该文件,找到文件的末尾,格式是 用户名: 密码,用户角色;

10、角色信息的定义放在D:\apache-activemq-5.15.5\conf下的jetty.xml文件中; 

11、 我们知道了角色定义的位置,角色对应的用户名和密码后,我们就可以使用默认的用户名admin和默认的密码admin来登录系统; 

12、 登录成功以后,就可以看到activemq的主页了;

四、Java操作ActiveMQ代码实战

1. 新建服务端项目activemq-service

1.1 idea创建maven项目 

1.2 pom.xml文件添加依赖

    4.0.0 
    com.wxc    activemq-service    1.0-SNAPSHOT 
                        junit            junit            4.12            
            test-->
         
        
                    org.apache.activemq            activemq-client            5.15.0

1.3 创建测试类
com.wxc.test包下新建TestService.java

其中testQueueProducer方法为队列类型,testTopicProducer方法为发布/订阅类型,其中创建步骤如下:

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。

package com.wxc.test;
import org.apache.activemq.ActiveMQConnectionFactory;import org.junit.Test;
import javax.jms.*;
public class TestService {
 
    @Test
    public void testQueueProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        //brokerURL服务器的ip及端口号
        //8161是后台管理系统,61616是给java用的tcp端口
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
        //参数:队列的名称。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*TextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }
 
    @Test
    public void testTopicProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        // brokerURL服务器的ip及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。
        // 参数:话题的名称。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*
         * TextMessage message = new ActiveMQTextMessage(); message.setText(
         * "hello activeMq,this is my first test.");
         */
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }
}

温馨提示:8161端口是后台管理系统,61616端口是给java用的tcp端口

2. 新建消费端项目activemq-customer

2.1 idea创建maven项目 

2.2 pom.xml文件添加依赖

    4.0.0 
    com.wxc    activemq-customer    1.0-SNAPSHOT                                        org.apache.maven.plugins                maven-compiler-plugin                                    6                    6                                         
                        junit            junit            4.12            
            test-->
         
        
                    org.apache.activemq            activemq-client            5.15.0

2.3 新建测试类
com.wxc.test包下新建TestCustomer.java
testQueueConsumer方法是测试队列方式,testTopicConsumer方法是测试发布/订阅方式,创建步骤如下:
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源

package com.wxc.test;
import org.apache.activemq.ActiveMQConnectionFactory;import org.junit.Test;
import javax.jms.*;
public class TestCustomer {
 
    @Test
    public void testQueueConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        //8161是后台管理系统,61616是给java用的tcp端口
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
 
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    //取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
 
    @Test
    public void testTopicConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
 
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消费端03。。。。。");
        // 等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

 

3. 新建消费端项目activemq-customer2

3.1 idea创建maven项目 

3.2 pom.xml文件添加依赖

    4.0.0 
    com.wxc    activemq-customer2    1.0-SNAPSHOT                                        org.apache.maven.plugins                maven-compiler-plugin                                    6                    6                                         
                        junit            junit            4.12            
            test-->
         
        
                    org.apache.activemq            activemq-client            5.15.0

3.3 新建测试类
com.wxc.test包下新建TestCustomer.java
testQueueConsumer方法是测试队列方式,testTopicConsumer方法是测试发布/订阅方式,创建步骤如下:
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源

package com.wxc.test;
import org.apache.activemq.ActiveMQConnectionFactory;import org.junit.Test;
import javax.jms.*;
public class TestCustomer {
 
    @Test
    public void testQueueConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        //8161是后台管理系统,61616是给java用的tcp端口
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
 
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    //取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
 
    @Test
    public void testTopicConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
 
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消费端03。。。。。");
        // 等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

4. 代码说明

(1)创建了两个客户端的连接,是用于测试过程中体现队列方式只能被一个消费者接收,而发布/订阅方式可以被多个消费者同时收到
(2)8161端口是后台管理系统,61616端口是给java用的tcp端口

5.项目运行

5.1 队列方式

运行两个消费者端

运行服务者端

数据结果如下:

JAVA语言之解读消息中间件-ActiveMQ

所以验证了队列方式只能有一个消费者端接收得到,且当客户端未运行时,服务器已经发送信息了,那么ActivieMQ会在客户端启动时候,传送数据给它

5.2 发布/订阅方式
运行两个消费者端

运行服务者端

数据结果如下:

JAVA语言之解读消息中间件-ActiveMQ

所以验证了发布/订阅方式可以多个消费者端接收得到,且当客户端未运行时,服务器已经发送信息了,那么ActivieMQ会在客户端启动时候,传送数据给它

五、Spring整合ActiveMQ代码实战

1. 使用方法

第一步:引用相关的jar包

            org.springframework            spring-jms                            org.springframework            spring-context-support

第二步:配置Activemq整合spring。配置ConnectionFactory

 
    
                    
            
            

第三步:配置生产者。
使用JMSTemplate对象。发送消息。
第四步:在spring容器中配置Destination

 
    
                    
            
                
    
            
                
                        spring-queue

2. 代码测试

2.1 发送消息
第一步:初始化一个spring容器
第二步:从容器中获得JMSTemplate对象。
第三步:从容器中获得一个Destination对象
第四步:使用JMSTemplate对象发送消息,需要知道Destination

@Test
    public void testQueueProducer() throws Exception {
        // 第一步:初始化一个spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        // 第二步:从容器中获得JMSTemplate对象。
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        // 第三步:从容器中获得一个Destination对象
        Queue queue = (Queue) applicationContext.getBean("queueDestination");
        // 第四步:使用JMSTemplate对象发送消息,需要知道Destination
        jmsTemplate.send(queue, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage("spring activemq test");
                return textMessage;
            }
        });
    }

2.2 接收消息
Taotao-search-Service中接收消息。
第一步:把Activemq相关的jar包添加到工程中
第二步:创建一个MessageListener的实现类。

public class MyMessageListener implements MessageListener {
 
    @Override
    public void onMessage(Message message) {
        
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息内容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

第三步:配置spring和Activemq整合。

 
    
                    
            
                
                        spring-queue

第四步:测试代码。

@Test
    public void testQueueConsumer() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //等待
        System.in.read();
    }


本文由职坐标整理发布,学习更多的相关知识,请关注职坐标IT知识库!

 

本文由 @从安 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程