ActiveMQ簡介 ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現 JMS:Java Message Service java消息服務 ActiveMQ:實現JMS規範 JM ...
ActiveMQ簡介
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現
JMS:Java Message Service java消息服務
ActiveMQ:實現JMS規範
JMS只給出介面,具體的實現由中間件完成,AcitveMQ為其中的一種
其它的消息隊列產品:ActiveMQ、RabbitMQ、Kafka、MetaMQ等
消息隊列中間件是分散式系統中的重要組件,主要解決應用耦合,非同步消息,流量削鋒等問題, 實現高性能,高可用,可伸縮和最終一致性的架構
ActiveMQ下載
官網:http://activemq.apache.org/
目錄結構
啟動ActiveMQ
進入bin目錄啟動服務。
http://localhost:8161/admin/queues.jsp
http埠8161:web頁面訪問埠
Tcp埠連接服務埠:61616
預設登陸用戶名,密碼:admin
ActiveMQ操作界面
常用術語
Provider/MessageProvider:生產者
Consumer/MessageConsumer:消費者
PTP:Point To Point,點對點通信消息模型
Pub/Sub:Publish/Subscribe,發佈訂閱消息模型
Queue:隊列,目標類型之一,和PTP結合
Topic:主題,目標類型之一,和Pub/Sub結合
ConnectionFactory:連接工廠,JMS用它創建連接
Connnection:JMS Client到JMS Provider的連接
Destination:消息目的地,由Session創建
Session:會話,由Connection創建,實質上就是發送、接受消息的一個線程,因此生產者、消費者都是Session創建的
View Code
spring整合activeMQ應用
配置生產者
第一步:創建maven,導入spring和activeMQ的坐標或web工程,導入相應activeMQjar包和與spring整合的jar包
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.4.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.9</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.2</version> </dependency> </dependencies>pom.xml
第二步:提供spring配置文件(配置生產者相關)引入amq,jms名稱空間
第三步:配置連接工廠(緩存session工廠),配置模板對象
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 配置連接工廠對象:產生Connection 方式一:通過amq名稱空間創建連接工廠 方式二:可以通過bean標簽創建對象--> <!-- <amq:connectionFactory id="connectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"> </amq:connectionFactory> --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg index="0" value="admin"></constructor-arg> <constructor-arg index="1" value="admin"></constructor-arg> <constructor-arg index="2" value="tcp://localhost:61616"></constructor-arg> </bean> <!-- spring提供優化緩存session對象 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory"></property> <property name="sessionCacheSize" value="10"></property> </bean> <!-- spring提供模板對象jmsTemplate:向mq伺服器寫入消息(p2p,pub/sub) --> <!-- 發送點對點消息 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"></property> <!-- 通過屬性pubSubDomain指定消息模式:預設值false --> <property name="pubSubDomain" value="false"></property> </bean> <!-- 發送主題模式消息 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"></property> <property name="pubSubDomain" value="true"></property> </bean> </beans>
第四步:編寫單元測試方法,在類中註入模板對象JmsTemplate。通過此對象發送消息到隊列
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext.xml") public class ProduceTest { //註入模板對象 @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; @Test public void test() { jmsTemplate.send("test_spring", new MessageCreator() { // 創建對象 public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("tel", "1311111111"); mapMessage.setString("code", "MSXX88sdfsdf"); return mapMessage; } }); } /*public static void main(String[] args) { ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); JmsTemplate jmsTemplate = (JmsTemplate) classPathXmlApplicationContext.getBean("jmsQueueTemplate"); //發送消息 jmsTemplate.send("test_spring", new MessageCreator() { //創建對象 public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("tel", "1311111111"); mapMessage.setString("code", "MSXX88sdfsdf"); return mapMessage; } }); }*/ }
配置消費者
第一步:開發一個類,監聽消息隊列
@Component("consumerListener") public class ConsumerListener implements MessageListener{ //如果註冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法 public void onMessage(Message message) { try { MapMessage mapMessage = (MapMessage) message; String tel = mapMessage.getString("tel"); String code = mapMessage.getString("code"); System.out.println(tel+"**********"+code); } catch (JMSException e) { e.printStackTrace(); } } }
第二步:配置spring 配置文件,註冊監聽器
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!-- 配置連接工廠對象:產生Connection 方式一:通過amq名稱空間創建連接工廠 方式二:可以通過bean標簽創建對象--> <!-- <amq:connectionFactory id="connectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"> </amq:connectionFactory> --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- 構造方法賦值 -->
<constructor-arg index="0" value="admin"></constructor-arg>
<constructor-arg index="1" value="admin"></constructor-arg>
<constructor-arg index="2" value="tcp://localhost:61616"></constructor-arg> </bean> <!-- spring提供優化緩存session對象 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory"></property> <property name="sessionCacheSize" value="10"></property> </bean> <context:component-scan base-package="cn.itcast"></context:component-scan> <!-- 在監聽器容器中註冊監聽器對象 acknowledge:設置應答模式 auto自動應答 destination-type:隊列類型(queue,topic) connection-factory:註入連接工廠 jms:listener:節點註入監聽器對象 --> <jms:listener-container acknowledge="auto" destination-type="queue" connection-factory="cachingConnectionFactory"> <!-- destination:監聽哪個隊列 --> <jms:listener destination="test_spring" ref="consumerListener"/> </jms:listener-container> </beans>
測試代碼
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext.xml") public class ConsumerTest { @Test public void test() {
//保證spring工廠不關閉,web項目啟動,tomcat不停止,監聽器會自動監聽隊列中消息 while(true){ } } }
啟動生產者,消費者服務,消息生產者把將消息發送到伺服器,將消息存放在隊列或主題中,消息伺服器會將消息轉發給接受者,ActiveMQ的非同步消息使得消息的發送與接受無必然聯繫,只要將消息發出,消息發出端繼續執行代碼,無需等待消息消費端