Apache activeMQ消息隊列

来源:https://www.cnblogs.com/you43n/archive/2018/04/21/8904265.html
-Advertisement-
Play Games

ActiveMQ簡介 ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現 JMS:Java Message Service java消息服務 ActiveMQ:實現JMS規範 JM ...


ActiveMQ簡介

  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現

  JMS:Java Message Service    java消息服務     

  ActiveMQ:實現JMS規範

  JMS只給出介面,具體的實現由中間件完成,AcitveMQ為其中的一種

  其它的消息隊列產品:ActiveMQ、RabbitMQ、Kafka、MetaMQ等

  消息隊列中間件是分散式系統中的重要組件,主要解決應用耦合,非同步消息,流量削鋒等問題,  實現高性能,高可用,可伸縮和最終一致性的架構

ActiveMQ下載

  官網:http://activemq.apache.org/

  

目錄結構

啟動ActiveMQ

  進入bin目錄啟動服務。
  http://localhost:8161/admin/queues.jsp
  http埠8161:web頁面訪問埠
  Tcp埠連接服務埠:61616
  預設登陸用戶名,密碼:admin

ActiveMQ操作界面 

常用術語

    Provider/MessageProvider:生產者
    Consumer/MessageConsumer:消費者
    PTP:Point To Point,點對點通信消息模型
    Pub/Sub:Publish/Subscribe,發佈訂閱消息模型
    Queue:隊列,目標類型之一,和PTP結合
    Topic:主題,目標類型之一,和Pub/Sub結合
    ConnectionFactory:連接工廠,JMS用它創建連接
    Connnection:JMS Client到JMS Provider的連接
    Destination:消息目的地,由Session創建
    Session:會話,由Connection創建,實質上就是發送、接受消息的一個線程,因此生產者、消費者都是Session創建的
View Code

spring整合activeMQ應用

配置生產者

第一步:創建maven,導入spring和activeMQ的坐標或web工程,導入相應activeMQjar包和與spring整合的jar包

<dependencies>
      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.2.0</version>
      </dependency>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-jms</artifactId>
          <version>4.2.4.RELEASE</version>
      </dependency>
      <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.9</version>
      </dependency>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-test</artifactId>
          <version>4.2.4.RELEASE</version>
      </dependency>
  <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>4.2</version>
    </dependency>
  </dependencies>
pom.xml

第二步:提供spring配置文件(配置生產者相關)引入amq,jms名稱空間

第三步:配置連接工廠(緩存session工廠),配置模板對象

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jpa="http://www.springframework.org/schema/data/jpa" 
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="
                        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
                        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
                        http://www.springframework.org/schema/data/jpa 
                        http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
                        http://activemq.apache.org/schema/core
                        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">               
   <!-- 配置連接工廠對象:產生Connection  方式一:通過amq名稱空間創建連接工廠  方式二:可以通過bean標簽創建對象-->
    <!-- <amq:connectionFactory 
        id="connectionFactory" 
        userName="admin" password="admin" 
        brokerURL="tcp://localhost:61616">
    </amq:connectionFactory> -->
    
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg index="0" value="admin"></constructor-arg>
        <constructor-arg index="1" value="admin"></constructor-arg>
        <constructor-arg index="2" value="tcp://localhost:61616"></constructor-arg>
    </bean>
    
    
    <!-- spring提供優化緩存session對象 -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory"></property>
        <property name="sessionCacheSize" value="10"></property>
    </bean>
    
    
    <!-- spring提供模板對象jmsTemplate:向mq伺服器寫入消息(p2p,pub/sub) -->
    
    <!-- 發送點對點消息 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"></property>
        <!-- 通過屬性pubSubDomain指定消息模式:預設值false  -->
        <property name="pubSubDomain" value="false"></property>
    </bean>
    
    <!-- 發送主題模式消息 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"></property>
        <property name="pubSubDomain" value="true"></property>
    </bean>
</beans>

第四步:編寫單元測試方法,在類中註入模板對象JmsTemplate。通過此對象發送消息到隊列

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
public class ProduceTest {

    //註入模板對象
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    @Test
    public void test() {
        jmsTemplate.send("test_spring", new MessageCreator() {

            // 創建對象
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("tel", "1311111111");
                mapMessage.setString("code", "MSXX88sdfsdf");
                return mapMessage;
            }
        });
    }

    /*public static void main(String[] args) {
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
        JmsTemplate jmsTemplate = (JmsTemplate) classPathXmlApplicationContext.getBean("jmsQueueTemplate");
        //發送消息
        jmsTemplate.send("test_spring", new MessageCreator() {
            
            //創建對象
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("tel", "1311111111");
                mapMessage.setString("code", "MSXX88sdfsdf");
                return mapMessage;
            }
        });
    }*/
}

配置消費者

第一步:開發一個類,監聽消息隊列

@Component("consumerListener")
public class ConsumerListener implements MessageListener{

    //如果註冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法
    public void onMessage(Message message) {
        try {
            MapMessage mapMessage = (MapMessage) message;
            String tel = mapMessage.getString("tel");
            String code = mapMessage.getString("code");
            System.out.println(tel+"**********"+code);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

第二步:配置spring 配置文件,註冊監聽器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jpa="http://www.springframework.org/schema/data/jpa" 
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="
                        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
                        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
                        http://www.springframework.org/schema/data/jpa 
                        http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
                        http://activemq.apache.org/schema/core
                        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd
                        http://www.springframework.org/schema/jms
                        http://www.springframework.org/schema/jms/spring-jms.xsd">
                        
    <!-- 配置連接工廠對象:產生Connection  方式一:通過amq名稱空間創建連接工廠  方式二:可以通過bean標簽創建對象-->
    <!-- <amq:connectionFactory 
        id="connectionFactory" 
        userName="admin" password="admin" 
        brokerURL="tcp://localhost:61616">
    </amq:connectionFactory> -->
    
       <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     <!-- 構造方法賦值 -->
<constructor-arg index="0" value="admin"></constructor-arg>

<constructor-arg index="1" value="admin"></constructor-arg>
        <constructor-arg index="2" value="tcp://localhost:61616"></constructor-arg>
    </bean>
    
    
    <!-- spring提供優化緩存session對象 -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory"></property>
        <property name="sessionCacheSize" value="10"></property>
    </bean>
    
    <context:component-scan base-package="cn.itcast"></context:component-scan>
    
    <!-- 在監聽器容器中註冊監聽器對象
        acknowledge:設置應答模式  auto自動應答
        destination-type:隊列類型(queue,topic)
        connection-factory:註入連接工廠
        
        jms:listener:節點註入監聽器對象
     -->
    <jms:listener-container 
            acknowledge="auto"  destination-type="queue"  connection-factory="cachingConnectionFactory">
            <!-- destination:監聽哪個隊列 -->
            <jms:listener destination="test_spring" ref="consumerListener"/>
    </jms:listener-container>

</beans>

測試代碼

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
public class ConsumerTest {


    @Test
    public void test() {
    //保證spring工廠不關閉,web項目啟動,tomcat不停止,監聽器會自動監聽隊列中消息
while(true){ } } }

啟動生產者,消費者服務,消息生產者把將消息發送到伺服器,將消息存放在隊列或主題中,消息伺服器會將消息轉發給接受者,ActiveMQ的非同步消息使得消息的發送與接受無必然聯繫,只要將消息發出,消息發出端繼續執行代碼,無需等待消息消費端



 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這個問題在本科的時候就接觸過了,這兩天做筆試題的時候又看到覺得有必要碼一下。 高內聚低耦合,是軟體工程中的概念,是判斷設計好壞的標準,主要是面向對象的設計,看類的內聚性是否高,耦合度是否低。 下文轉至 https://blog.csdn.net/walid1992/article/details/7 ...
  • 今天周六,休息,剛下過雨,有風。 哈哈,像不像古龍的小說。程式員不是機器人,不是國家總理,沒有那麼忙。而老闆講究的永遠是利益,利用,所以當你沒利用價值的時候,看老闆能閑養你三個月嗎?程式員的賺錢工具就是技術,別人無法替代的技術。天天休息不好,失眠,心亂,只能寫篇博客靜靜心。 在面向對象中,每個對象都 ...
  • 本題要求實現一個字元串查找的簡單函數。 函數介面定義: 函數search在字元串s中查找子串t,返回子串t在s中的首地址。若未找到,則返回NULL。 裁判測試程式樣例: 輸入樣例1: 輸出樣例1: 輸入樣例2: 輸出樣例2: char *search(char *s, char *t){ char ...
  • 問:我最近聯繫到了一家培訓機構 ,從他們公司瞭解到,學習Java從零基礎到就業就4個月,我很擔心這4個月究竟能學到多少。在這4個月究竟能不能學到東西。還有就是就業率還有失業率 答: 4個月學肯定是可以學習到知識的,但是就業就不敢保證了。要看你學習的效果了。四個月的課程很趕,很多都在趕進度,能不能跟得 ...
  • 1、不僅方法要public,類也要是public許可權 2、修改Java文件字元集 沒有完成,決定修改eclipse的預設字元集為gdk,需要使用時再修改為utf-8 ...
  • Python re 模塊 TOC 介紹 作用 正則表達式語法 貪婪和非貪婪 普通字元和特殊字元 re modul level 方法 正則表達式對象 匹配對象 常用例子 註意事項 Jamie Zawinski said: Some people,when confronted with a probl ...
  • 一、Spring Boot 介紹 Spring Boot 是由Pivotal團隊提供的一種全新的微服務框架,其設計目的是用來簡化Spring應用的初始化搭建以及開發過程。該框架使用了特定的方式來進行配置,從而使開發人員不再需要定義樣板話的配置。通過這種方式,Spring Boot致力於在蓬勃發展的快 ...
  • package com.wz.poi; import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.io.Outpu ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...