分散式消息通信ActiveMQ

来源:https://www.cnblogs.com/wangruihua-521/archive/2018/12/21/10153558.html
-Advertisement-
Play Games

消息中間件 消息中間件是指利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並且基於數據通信來進行分散式系統的集成。通過提供消息傳遞和消息排隊模型,可以在分散式架構下擴展進程之間的通信。 消息中間件能做什麼 消息中間件主要解決分散式系統之間消息的傳遞問題 ,能夠屏蔽各種平臺以及協議之間的特性,實現應 ...


消息中間件

消息中間件是指利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並且基於數據通信來進行分散式系統的集成。通過提供消息傳遞和消息排隊模型,可以在分散式架構下擴展進程之間的通信。

消息中間件能做什麼

消息中間件主要解決分散式系統之間消息的傳遞問題 ,能夠屏蔽各種平臺以及協議之間的特性,實現應用之間的協同。

示例:

電商平臺中的註冊功能,用戶註冊不單是向資料庫insert,可能還需要贈送積分,發送郵件,發送簡訊等系列操作。

假如:每個操作都耗時1s,那麼註冊過程就需要耗時4s才能響應給用戶。從註冊這個服務可以看出,每個子操作都是獨立的,同時,基於領域劃分以後,它們都屬於不同的子域。所以我們可以對這些子操作實現非同步化操作。類似多線程並行處理。

如何實現非同步化?用多線程能實現嗎?多線程當然可以實現,只是,消息的持久化、消息的重發這些條件,多線程 並不能滿足.所以需要藉助一些開源的消息中間件來解決。 而分散式消息隊列就是一個很好的解決辦法。通過引入分散式隊列,大大提升程式的處理效率,並且還解決了各個模塊之間的耦合問題。

分散式消息隊列解決的場景:

引入消息中間件後(非同步處理),電商平臺中的註冊架構圖變為

電商中的秒殺:

 

用戶提交過來的請求,先寫入消息隊列。消息隊列是有長度的,如果消息隊列超過指定長度,直接拋棄。

秒殺的 具體核心處理業務,接收消息隊列中消息進行處理。這裡的消息處理能力取決於消費端本身的吞吐量。

解耦、非同步化、流量整形、數據的最終一致性(最大化的重試完成數據一致性)

ActiveMQ 簡介

ActiveMQ

ActiveMQ 是完全基於JMS 規範實現的一個消息中間件產品,是Apache 開源基金會研發的消息中間件。ActiveMQ 主要應用在分散式系統架構中,幫助構建高可用、高性能、可伸縮的企業級面向服務的系統。

ActiveMQ 特性

  • 多語言和協議編寫客戶端

    • 語言:Java、C、C++、C#、Ruby、Perl、Python、PHP

    • 協議:openwire、stomp、REST、ws、notification、xmpp、AMQP

  • 完全支持JMS1.1和J2EE1.4規範

  • 對Spring的支持,ActiveMQ可以很容易的嵌入到spring模塊中

ActiveMQ 下載安裝啟動

下載地址

http://activemq.apache.org/activemq-5158-release.html

解壓

tar -zxvf apache-activemq-5.15.8-bin.tar.gz

啟動服務

  • cd apache-activemq-5.15.8/bin

    sh activemq start

  • 啟動並帶指定日誌文件 sh activemq start > /tmp/activemqlog

關閉服務

  • sh activemq stop

監控地址

http://192.168.15.134:8161/admin/ admin admin

ActiveMQ 的埠61616

  • 預設為61616

  • 檢查是否成功啟動ActiveMQ

    • netstat -an|grep 61616

JMS 基本概念和模型

JMS的定義

JMS(Java Message Service) :面向消息中間件的API

MOM(Message Oriented Middleware):面向消息中間件

Java 消息服務是Java平臺中關於面向消息中間件的API,用於兩個程式 之間,或者分散式系統中發送消息,進行非同步通信。

JMS 是一個與具體平臺無關的API,絕大多數MOM 提供商都對JMS提供了支持。ActiveMQ就是其中的一個實現。

MOM

MOM 是面向消息的中間件,使用消息傳送提供者來協調消息傳送操作。 MOM 需要提供API和管理工具。客戶端使用API調用,把消息發送到由提供者管理的目的地。在發送消息後,客戶端會繼續執行其他工作,並且在接收方收到這個消息確認之前,提供者一直保留該消息。

MOM 的特點

  • 消息非同步接收,發送者不需要等待消息接受者響應

  • 消息可靠接收,確保消息中間件可靠保存。只有接收方收到消息後才刪除消息

開源JMS提供商

JbossMQ(jboss4)、Jboss messaging(jboss5)、joram、ubermq、mantamq、openjms ...

JMS 規範

JMS 規範的目的是為了使得Java 應用程式能夠訪問現有MOM(消息中間件)系統,形成一套統一的標準規範,解決不同消息中間件之間的協作問題。

  • 不同消息的傳遞域,點對點消息傳送和發佈/訂閱消息傳送

  • 提供接收同步和非同步消息的工具

  • 對可靠消息傳送的支持

  • 常見消息格式,例如流、文本和位元組

JMS 的體繫結構

 

JMS 的基本功能

JMS 的基本功能是用於和麵向消息中間件相互通信的應用程式的介面

消息傳遞域

  • p2p(point-2-point) 點對點消息傳遞域

    • 每個消息只能有一個消費者(離線存儲)

      • 類似QQ聊天的私聊

    • 生產者和消費者之間沒有時間上的相關性,無論消費者在生產者發送消息的時候是否處於運行狀態,都可以提取消息

    • 如果session關閉時,有一些消息已經被收到,但是沒有被簽收,消費者下一次連接到相同對列時,這些消息仍然會被接收

    • 如果用戶在receive 方法中設定了消息的選擇條件(消息過濾)

    • 如果是持久化消息,消息會被持久化保存,直到消息被簽收

  • 發佈訂閱(publish/subscribe)消息傳遞域

    • 每個消息有多個消費者

      • 類似QQ群聊

    • 生產者和消費者有時間上的相關性

      • 訂閱一個主題的消費者只能消費自它訂閱之後發佈的消息。

      • JMS 規範允許客戶創建持久訂閱,一定程度上降低了時間的相關性要求

      • 持久訂閱允許消費者消費它在未處於激活狀態時發送的消息

    • 持久化訂閱和非持久化訂閱

    • 在非持久化訂閱的前提下,不能恢復或者重新指派一個未簽收的消息;

    • 如果所有消息必須要簽收,則使用持久訂閱

消息的組成

消息頭(Header)

消息頭包含消息的識別信息和路由信息

消息頭包含一些標準的屬性:

  • JMSDestination

    • 消息發送的目的地,queue或者topic

  • JMSDeliveryMode

    • 傳送模式,持久化模式和非持久模式

  • JMSPrority

    • 消息優先順序(優先順序分為10個級別,從0最低-9最高)

    • 如果不設定優先順序,預設級別4,需要註意的是,JMS Provider 並不一定保證按照優先順序的順序提交

  • JMSMessageID

    • 唯一識別每個消息的標識

消息體

就是我們需要傳遞的消息的內容

JMS API定義了5種消息體格式:

  • TextMessage

    • java.lang.String 對象,如xml文件內容

  • MapMessage

    • 名/值對的集合,名是String 對象,值可以是Java 任何基本類型

  • BytesMessage

    • 位元組流

  • StreamMessage

    • Java 中的輸入輸出流

  • ObjectMessage

    • Java 中的可序列化對象

  • Message

    • 沒有消息體,只有消息頭和屬性

消息的屬性

按類型分為:

  • 應用設置的屬性

    • Message.setStringProperty(key,value);

  • 標準屬性

    • 使用“JMSX” 作為屬性名的首碼

  • 消息中間件定義的屬性

    • JMS Provider 特定的屬性

JMS 的可靠機制

消息的確認方式

消息的處理階段:

  • 客戶端接收消息

  • 客戶端處理消息

  • 消息被確認

會話存在兩種機制:

  • 事務性會話

    • createSession(boolean transacted, int acknowledgeMode)

      • Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

    • session.commit() //消息被確認 事務提交意味著生產的所有消息被髮送,消費的所有消息被確認

    • session.rollback(); //重新處理 消息沒有被提交,沒有被處理,消費端的所有消息被恢復,並且重新被提交, 表示一個事務結束, 另一個事務會開始。事務回滾意味著生產的所有消息被銷毀,消費的所有消息 被恢復並重新提交,除非它們已經過期

    • 通過session.commit() //完成事務的簽收

  • 非事務性會話

    • transacted 設置為FALSE

    • Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

    • 客戶端簽收模型

    • Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

    • 那麼需要手動簽收

    • textMessage.acknowledge();

    • 客戶端延遲確認,消息可能重覆消費

      • Session session = connection.createSession(Boolean.FALSE, DUPS_OK_ACKNOWLEDGE);

事務性的自動確認

非事務性的自動確認和手動確認

消息的持久化存儲

持久化(存儲在資料庫或磁碟)

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

對於持久消息,消息提供者會使用存儲-轉發機制,先將消息存儲到穩定的介質中,等消息發送成功後再刪除。如果JMS Provider 宕機,那麼這些未送達的消息則不會丟失,JMS Provider 恢復正常後,會重新讀取這些消息,並傳送給對應的消費者。

非持久化(存儲在記憶體中)

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

對於非持久化消息,JMS Provider 不會將它存到文件、資料庫等穩定介質中。也就是說非持久消息,存儲在記憶體中,如果JMS Provider 宕機,那麼非持久化消息會丟失。

持久訂閱

  • 持久訂閱者和非持久訂閱者針對的Domain 是Pub/Sub,而不是P2P

  • 當Broker 發送消息給訂閱者時,如果訂閱者處於未激活狀態,持久訂閱者可以收到消息,而非持久訂閱者則收不到消息。

  • 當持久訂閱者處於未激活狀態時,Broker 需要為持久訂閱者保存消息,如果持久訂閱者訂閱的消息太多則會溢出。

  • 持久訂閱時,客戶端向JMS 伺服器註冊一個自己身份的ID, 當這個客戶端處於離線時,JMS Provider 會為這個ID 保存所有發送到主題的消息,當客戶再次連接到 JMS Provider時,會根據自己的ID得到所有當自己處於離線時發送到主題的消息。

  • 持久訂閱的方式(消費端)

    • connection.setClientID("test");

    • Topic destination=session.createTopic("myTopic");

    • MessageConsumer consumer=session.createDurableSubscriber(destination,"test");

JMS 規範結合ActiveMQ 實現消息發送

案例架構圖

 

示例代碼

引入Jar 包

 <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.8</version>
 </dependency> 

生產端

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JMSQueueProducer {
    public static void main(String args[]) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.15.134:61616");
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //創建目的地
            Destination destination = session.createQueue("myQueue");
            //創建發送者
            MessageProducer producer = session.createProducer(destination);
            //持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMessage = session.createTextMessage("Hello,World");
            producer.send(textMessage);
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 消費端

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import javax.xml.soap.Text;
public class JMSQueueConsumer {

    public static void main(String args[]) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.15.134:61616");
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            //創建目的地
            Destination destination = session.createQueue("myQueue");
            //創建接收者
            MessageConsumer  consumer = session.createConsumer(destination);
            //接收消息    阻塞方式監聽消息
            TextMessage textMessage =(TextMessage) consumer.receive();
            System.out.println(textMessage.getText());
            session.commit(); //表示消息被自動確認
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null)
            {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、面向過程 我們是怎麼思考和解決上面的問題的呢? 答案是:我們自己的思維一直按照步驟來處理這個問題,這是我們常規思維,這就是所謂的面向過程POP編程 二、面向過程POP為什麼轉換為面向對象OOP 面向過程想的思想步驟越多,變化越多,是無法掌控的,所以有時候非常複雜,就比如我們拿起來手機玩游戲如果按 ...
  • 作者按:《每天一個設計模式》旨在初步領會設計模式的精髓,目前採用 和`python`兩種語言實現。誠然,每種設計模式都有多種實現方式,但此小冊只記錄最直截了當的實現方式 :) 個人技術博客 "godbmw.com" 歡迎來玩! 每周至少 1 篇原創技術分享,還有開源教程(webpack、設計模式)、 ...
  • 觀察者模式 Observer,又被稱為發佈訂閱模式或者源監聽,是一種行為型設計模式,也是也是很多系統中常常用到的一種消息處理機制,Java內置了對觀察者模式的支持,藉助於觀察者模式可以很好地完成消息的發佈與訂閱,本文對觀察者模式進行了簡介,並且給出了意圖和結構的解析,並且給出了Java代碼示例。 ...
  • 廢話在前 什麼是代碼覆蓋率 來自 百度百科 代碼覆蓋(Code coverage)是軟體測試中的一種度量,描述程式中源代碼被測試的比例和程度,所得比例稱為代碼覆蓋率。 開發人員為何關註? 在我們的開發過程中,經常要用各種方式進行自測,或是各種 xUnit 系列,或是 postman,或是直接curl ...
  • 一、引言 設計模式是軟體開發人員在軟體開發過程中面臨的一般問題的解決方案。這些解決方案是眾多軟體開發人員經過相當長的一段時間的試驗和錯誤總結出來的。學習設計模式會幫助你更好的理解面向對象。 設計模式有多達23種,這些模式可以分為三大類:創建型模式(Creational Patterns)、結構型模式 ...
  • 156.mvc:view-controller有什麼作用? 157.<mvc:annotation-driven />作用是什麼? 158.springMVC中form標簽如何使用 159.springMVC如何訪問靜態資源 160.SpringMVC如何範圍Json數據格式 ...
  • 前言 開心一刻 我和兒子有個共同的心愿,出國旅游。昨天兒子考試得了全班第一,我跟媳婦合計著帶他出國見見世面,吃晚飯的時候,一家人開始了討論這個。我:“兒子,你的心愿是什麼?”,兒子:“吃漢堡包”,我:“往大了說”,兒子:“變形金剛”,我:“今天你爹說了算,想想咱倆共同的心愿”,兒子怯生生的瞅了媳婦一 ...
  • 寫在前面 之前幾篇文章都是在寫圖片相關的爬蟲,今天寫個留言板爬出,為另一套數據分析案例的教程做做準備,作為一個河北人,遵紀守法,有事投訴是必備的技能,那麼咱看看我們大河北人都因為什麼投訴過呢? 今天要爬取的網站地址 ,一遍爬取一遍嘀咕,別因為爬這個網站在去喝茶,再次聲明,學習目的,切勿把人家網站爬癱 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...