JAVA语言之activeMQ和spring的整合
小标 2019-02-18 来源 : 阅读 522 评论 0

摘要:本文主要向大家介绍了JAVA语言之activeMQ和spring的整合,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

本文主要向大家介绍了JAVA语言之activeMQ和spring的整合,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

JAVA语言之activeMQ和spring的整合



        (1)MQ:Message Queue就是消息队列。

(2)ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。

(3)ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事 情了,但是JSM在当今的J2EE应用中仍然扮演者特殊的地位。

(4)ActiveMQ消息的传递有两种类型:

1)点对点:即一个生产者和一个消费者一一对应。

2)发布/订阅模式:即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

(5)主要特点:

1)多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议:OpenWire,Stomp REST,WS Notification,XMPP,AMQP。

2)完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。

3)对Spring支持,ActiveMQ可以很容易内嵌到使用spring的系统里面去。

4)通过了常见J2EE服务器(如Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4 商业服务器上。

5)支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。

6)支持通过JDBC和journal提供高速的消息持久化。

7)从设计上保证了高性能的集群,客户端-服务器,点对点。

8)支持Ajax。

9)支持与Axis的整合。

10)可以很容易的调用内嵌JMS Provider,进行测试。

二、JSM介绍(番外篇)

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件 (MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

JMS是一个与具体平台无关的API,具有跨平台性。

它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实 际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的 消息后去完成对应的业务逻辑。

JMS定义了五种不同的消息正文格式:

   • StreamMessage -- Java原始值的数据流

   • MapMessage--一套名称-值对

   • TextMessage--一个字符串对象

   •ObjectMessage--一个序列化的 Java对象
   • BytesMessage--一个字节的数据流

三、ActiveMQ下载

下载地址://activemq.apache.org/

四、ActiveMQ安装(Windows版本)

(1)安装环境

JDK1.7及以上版本,配置好环境变量。(不在赘述)

(2)将下载好的apache-activemq-5.14.5解压后(我放在了D盘下),打开目录

D:\apache-activemq-5.14.5\bin\win64,双击activemq.bat文件。如果你是Windows32位的操作系统,那么

你打开的目录是D:\apache-activemq-5.14.5\bin\win32,然后双击其下的activemq.bat文件。这样你的

ActiveMQ服务器就启动了。(注意:请不要双击D:\apache-activemq-5.14.5\bin目录下的activemq.bat

文件,会出现闪退。如果你下载的apache-activemq-5.15.2,那么请参考其他文章)。

(3)安装成功后我们就可以对activeMQ服务器进行访问了。

输入访问地址://192.168.37.161:8161/

     账号:admin

密码:admin

登录成功后出现以下页面:

五、 ActiveMQ整合进Spring

1)在eclipse中创建java项目,创建lib目录,导入jar包,jar包如下:

2)Sender---消息发送者代码

package com.alex.mq;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class Sender {

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-activeMQ.xml");
        JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
        Destination destination = context.getBean(Destination.class);
        for (int i = 0; i < 10; i++) {
            sendMessage(jmsTemplate,destination);
        }
    }
    
    public static void sendMessage(JmsTemplate jmsTemplate,Destination destination){
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message =  new ActiveMQTextMessage();
                
                    message.setText("##---我是被发送的消息内容---##");
                    return message;
                }
        });
    }
}

3)MyMessageListener---消息监听器1(接收消息,消息消费者)

package com.alex.mq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        String name = Thread.currentThread().getName();
        if (null != message) {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    String text = textMessage.getText();
                    System.out.println("线程:"+name+"...MyMessageListener接收到消息内容:"+text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

4)MySecondMessageListener---消息监听器2(接收消息,消息消费者)

package com.alex.mq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MySecondMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        String name = Thread.currentThread().getName();
        if (message instanceof TextMessage) {
            try {
                String content = ((TextMessage) message).getText();
                System.out.println("线程:"+name+"...MySecondMessageListener接收到消息内容:"+content);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

5)spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xmlns="//www.springframework.org/schema/beans" xmlns:context="//www.springframework.org/schema/context" xsi:schemalocation=""//www.springframework.org/schema/beans ">
    
    
    <!-- 真正可以提供connection的connectionFactory,由对应的JMS服务厂商提供 -->
    <bean name="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"></property>
    </bean>
    <!-- spring真正用于管理ConnectionFactory的ConnectionFactory -->
    <bean name="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
    </bean>
    <!-- spring提供的JMS工具类,它可以用于发送消息 -->
    <bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory是spring管理的那个对象 -->
        <property name="connectionFactory" ref="connectionFactory"></property>
    </bean>
    <!-- 这个是队列目的地,点对点的 -->
    <!-- <bean name="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
     -->
    <!-- 这个主题目的地,一对多的 -->
    <bean name="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"></constructor-arg>
    </bean>
    <!-- messageListener实现类 -->
    <bean name="messageListener" class="com.alex.mq.MyMessageListener"></bean>
    <!-- messageListener实现类 -->
    <bean name="secondMessageListener" class="com.alex.mq.MySecondMessageListener"></bean>
    <!-- 消息监听器容器1 -->
    <bean name="jsmContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"></property>
        <property name="destination" ref="topicDestination"></property>
        <property name="messageListener" ref="messageListener"></property>
    </bean>
    <!-- 消息监听器容器2 -->
    <bean name="jsmContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"></property>
        <property name="destination" ref="topicDestination"></property>
        <property name="messageListener" ref="secondMessageListener"></property>
    </bean>
    
</beans>

6)java目录图

六、执行结果(topic---发布/订阅模式)

七、执行结果(queue---点对点)

八、总结

1)activeMQ的点对点是一个生产消息,将消息放入queue队列,然后另一个消费消息,每一个消费者都是一个线程,它们消费的消息数量等于生产者生产的数量。

2)发布/订阅模式:生产者生产的所有消息都会被每一个的消费者消费。

3)生产者发送消息到MQ服务器,消费者监听MQ服务器,获取到消息后进行消费消息。

4)消费消息处理监听机制外还有receive(),接收消息,这种在项目中不常用,项目中常用的是监听机制。

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注编程语言JAVA频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程