Netty構建分散式消息隊列(AvatarMQ)設計指南之架構篇

来源:http://www.cnblogs.com/jietang/archive/2016/08/25/5808735.html
-Advertisement-
Play Games

目前業界流行的分散式消息隊列系統(或者可以叫做消息中間件)種類繁多,比如,基於Erlang的RabbitMQ、基於Java的ActiveMQ/Apache Kafka、基於C/C++的ZeroMQ等等,都能進行大批量的消息路由轉發。它們的共同特點是,都有一個消息中轉路由節點,按照消息隊列裡面的專業術 ...


  目前業界流行的分散式消息隊列系統(或者可以叫做消息中間件)種類繁多,比如,基於Erlang的RabbitMQ、基於Java的ActiveMQ/Apache Kafka、基於C/C++的ZeroMQ等等,都能進行大批量的消息路由轉發。它們的共同特點是,都有一個消息中轉路由節點,按照消息隊列裡面的專業術語,這個角色應該是broker。整個消息系統通過這個broker節點,進行從消息生產者Producer到消費者Consumer的消息路由。當然了,生產者和消費者可以是多對多的關係。消息路由的時候,可以根據關鍵字(專業的術語叫topic),進行關鍵字精確匹配、模糊匹配、廣播方式的消息路由。

  簡單來說,一個極簡的分散式消息隊列系統主要的構成模塊有:

  Broker:簡單來說就是消息隊列伺服器實體。

  Producer:消息的生產者,主要用來發送消息給消費者。

  Consumer:消息的消費者,主要用來接收生產者的消息。

  Routing Key:路由關鍵字(Topic),主要用來控制生產者和消費者之間的發送與接收消息的對應關係。

  Channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

  到此為止,我們明白了一個分散式消息隊列系統的主要構成模塊,現在本人就通過Netty,這個優秀的Java NIO網路通訊框架,構建一個支持上述應用場景的分散式消息隊列系統,本人把其命名為AvatarMQ。後續我會基於這個開源項目,連載出基於Netty構建分散式消息隊列系統系列相關的文章,闡明主要的設計思路、組織結構、模塊劃分依據、類圖結構等等。為了說明方便,後續本文中,如果沒有特殊說明,有涉及基於Netty構建的分散式消息隊列系統,就是指代AvatarMQ。由於整個開源項目涉及的代碼量比較多,所以希望大家在本人編寫系列博客文章的基礎上,耐心地理解、分析其中的代碼模塊,相信一定不會讓您失望!

  AvatarMQ基於Netty,所以首先,你要能清楚的理解Netty是什麼?它能做什麼?有興趣的朋友可以關註一下Netty項目的官網(http://netty.io/),上面有很詳細的入門文章介紹。雖然都是英文的,但是這些一手的資料更具權威性,值得花時間深入研究探索,畢竟現在流行的雲計算、大數據領域成功的開源項目比如Hadoop、Storm等等,網路通信層這塊全部依賴Netty,可見Netty的功能強大。

  基於Netty可以開發定製高性能、高可靠性的Java企業級服務端應用,而本文是我,在繼利用Netty構建高性能RPC伺服器系列文章之後,又一個基於Netty開發的分散式消息隊列系統(AvatarMQ)。此外AvatarMQ還大量使用了Java多線程的相關類庫。所以希望在此之前,大家能回憶複習一下,這樣理解起來會更加得心應手、事半功倍。

  AvatarMQ是基於Netty構建的分散式消息隊列系統,支持多個生產者和多個消費者之間的消息路由、傳遞。主要特性如下:

  • AvatarMQ基於Java語言進行編寫,網路通訊依賴Netty。
  • 生產者和消費者的關係可以是一對多、多對一、多對多的關係。
  • 若幹個消費者可以組成消費者集群,生產者可以向這個消費者集群投遞消息。
  • 消費者集群對於有共同關註點的消費者支持消息的負載均衡策略。
  • 支持動態新增、刪除生產者、消費者。
  • 目前僅僅支持關鍵字的精確匹配路由,後續會逐漸完善。
  • 消息隊列伺服器Broker基於Netty的主從事件線程池模型開發設計。
  • 網路消息序列化採用Kryo進行消息的網路序列化傳輸。
  • Broker的消息派發、負載均衡、應答處理(ACK)基於非同步多線程模型進行開發設計。
  • Broker消息的投遞,目前支持嚴格的消息順序。其中Broker還支持消息的緩衝派發,即Broker會緩存一定數量的消息之後,再批量分配給對此消息感興趣的消費者。

  AvatarMQ項目開源網址:https://github.com/tang-jie/AvatarMQ

  整個開源項目依賴的jar包請參考:https://github.com/tang-jie/AvatarMQ/blob/master/nbproject/project.properties

  另外,值得註意的是:

  AvatarMQ使用的Netty是基於4.0版本(下載地址:http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2)。

  消息序列化使用的Kryo是基於kryo-3.0.3版本(下載地址:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3)。

  請大家自行去官網下載使用。

  現在,現在言歸正傳,我們先來看下整合AvatarMQ項目的軟體架構圖:

  

  從上述圖例中,我們可以很清楚的看到:生產者和消費者之間是通過Broker進行消息的路由和轉發,同時Broker還負責應答生產者和接收消費者的處理應答。

  在瞭解了,整個AvatarMQ的組織架構之後,我們再來實際運行一下AvatarMQ!

  首先,先啟動一下Broker伺服器(對應代碼:https://github.com/tang-jie/AvatarMQ/blob/master/src/com/newlandframework/avatarmq/spring/AvatarMQServerStartup.java

  如果一切正常,終端控制台會列印如下輸出:

  

  接著,我們就來實際驗證一下AvatarMQ的消息推送功能。

  1、生產者發送1條消息給關註這條消息的消費者。我們先啟動消費者,再啟動生產者。

  其中消費者1的測試代碼(AvatarMQConsumer1.java)如下所示:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.consumer.AvatarMQConsumer;
import com.newlandframework.avatarmq.consumer.ProducerMessageHook;
import com.newlandframework.avatarmq.msg.ConsumerAckMessage;
import com.newlandframework.avatarmq.msg.Message;

/**
 * @filename:AvatarMQConsumer1.java
 * @description:AvatarMQConsumer1功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQConsumer1 {

    private static ProducerMessageHook hook = new ProducerMessageHook() {
        public ConsumerAckMessage hookMessage(Message message) {
            System.out.printf("AvatarMQConsumer1 收到消息編號:%s,消息內容:%s\n", message.getMsgId(), new String(message.getBody()));
            ConsumerAckMessage result = new ConsumerAckMessage();
            result.setStatus(ConsumerAckMessage.SUCCESS);
            return result;
        }
    };

    public static void main(String[] args) {
        AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-1", hook);
        consumer.init();
        consumer.setClusterId("AvatarMQCluster");
        consumer.receiveMode();
        consumer.start();
    }
}

  生產者1的測試代碼(AvatarMQProducer1.java)如下所示,其含義是發送1條消息,給關註“AvatarMQ-Topic-1”主題的消費者:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.producer.AvatarMQProducer;
import org.apache.commons.lang3.StringUtils;

/**
 * @filename:AvatarMQProducer1.java
 * @description:AvatarMQProducer1功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQProducer1 {

    public static void main(String[] args) throws InterruptedException {
        AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-1");
        producer.setClusterId("AvatarMQCluster");
        producer.init();
        producer.start();

        System.out.println(StringUtils.center("AvatarMQProducer1 消息發送開始", 50, "*"));

        for (int i = 0; i < 1; i++) {
            Message message = new Message();
            String str = "Hello AvatarMQ From Producer1[" + i + "]";
            message.setBody(str.getBytes());
            ProducerAckMessage result = producer.delivery(message);
            if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {
                System.out.printf("AvatarMQProducer1 發送消息編號:%s\n", result.getMsgId());
            }

            Thread.sleep(100);
        }

        producer.shutdown();
        System.out.println(StringUtils.center("AvatarMQProducer1 消息發送完畢", 50, "*"));
    }
}

  首先我們先來啟動消費者,如果一切正常,控制台輸出結果為:

  這個時候我們再運行生產者,發送一條消息給消費者。啟動生產者之後,控制台輸出結果如下:

  那現在,我們切回去看下消費者是否收到生產者的消息了呢?

  非常正確,我們的消費者果然收到了生產者發送過來的消息。

 

  2、生產者發送1條消息給不關註這條消息的消費者。

  首先說明的是,代碼樣例還是基於上述的AvatarMQConsumer1.java、AvatarMQProducer1.java。只不過這次是生產者發送的主題改成:“AvatarMQ-Topic-Test”,消費者關註的主題改成“AvatarMQ-Topic-1”。然後依次啟動消費者、生產者。下麵是實際的運行情況:

  生產者成功發送消息:

  那按照要求,消費者應該無法收到生產者的這條消息,實際情況是不是這樣呢?事實勝於雄辯,看如下截圖所示:

  消費者依然處理啟動監聽狀態,說明完全符合我們的預期。

 

  3、生產者發送N條消息(這裡是發送100條消息)給一個消費者集群(有2個消費者組成,並且這2個消費者關註的消息主題topic是相同的)。

  我們先啟動2個消費者,再啟動生產者。消費者代碼參考:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.consumer.AvatarMQConsumer;
import com.newlandframework.avatarmq.consumer.ProducerMessageHook;
import com.newlandframework.avatarmq.msg.ConsumerAckMessage;
import com.newlandframework.avatarmq.msg.Message;

/**
 * @filename:AvatarMQConsumer2.java
 * @description:AvatarMQConsumer2功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQConsumer2 {

    private static ProducerMessageHook hook = new ProducerMessageHook() {
        public ConsumerAckMessage hookMessage(Message message) {
            System.out.printf("AvatarMQConsumer2 收到消息編號:%s,消息內容:%s\n", message.getMsgId(), new String(message.getBody()));
            ConsumerAckMessage result = new ConsumerAckMessage();
            result.setStatus(ConsumerAckMessage.SUCCESS);
            return result;
        }
    };

    public static void main(String[] args) {
        AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-2", hook);
        consumer.init();
        consumer.setClusterId("AvatarMQCluster2");
        consumer.receiveMode();
        consumer.start();
    }
}

  生產者代碼參考(目的是發送100條消息)給消費者集群。

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.producer.AvatarMQProducer;
import org.apache.commons.lang3.StringUtils;

/**
 * @filename:AvatarMQProducer2.java
 * @description:AvatarMQProducer2功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQProducer2 {

    public static void main(String[] args) throws InterruptedException {
        AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-2");
        producer.setClusterId("AvatarMQCluster2");
        producer.init();
        producer.start();

        System.out.println(StringUtils.center("AvatarMQProducer2 消息發送開始", 50, "*"));

        for (int i = 0; i < 100; i++) {
            Message message = new Message();
            String str = "Hello AvatarMQ From Producer2[" + i + "]";
            message.setBody(str.getBytes());
            ProducerAckMessage result = producer.delivery(message);
            if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {
                System.out.printf("AvatarMQProducer2 發送消息編號:%s\n", result.getMsgId());
            }

            Thread.sleep(100);
        }

        producer.shutdown();
        System.out.println(StringUtils.center("AvatarMQProducer2 消息發送完畢", 50, "*"));
    }
}

  我們依次啟動消費者AvatarMQConsumer2兩次,這個時候終端控制台依次輸出:

  這個時候我們再啟動生產者,運行截圖如下:

  說明生產者發送了100條消息出去,看下我們消費者1接收的情況:

  繼續看下我們的消費者2,消息接收的情況,截圖如下:

  最終統計一下,消費者1,接收的消息編號都是奇數,一共50個。消費者2,接收到的消息編號都是偶數,一共50個。兩個消費者接收的消息總數加起來,剛好等於生產者發送的消息總數100個,完全符合我們的預期!另外消費者1、消費者2都收到了來自生產者的消息,說明Broker進行了消息的路由傳遞。

  4、多個生產者和多個消費者的消息傳遞,以及動態新增、刪除生產者、消費者。

  這個就交給大家自行測試了,由於篇幅有限,在此本人就不一一闡述。

 

  到目前為止,相信大家對於AvatarMQ所具備的基本功能,有了一個大致的印象。當然,AvatarMQ還有一些美中不足,比如:

  • 不支持消息的刷盤存儲,可能由於系統Crash,造成消息的丟失。後續需要接入一個存儲系統(基於Java NIO),保證消息的持久序列化。
  • AvatarMQ的生產者、消費者模塊,要進一步支持,斷網重連Broker的功能,確保在Broker重啟的情況下,把在途的消息繼續發送、接收完畢。
  • Broker單點的問題,根據高可用性集群HA(High Available)的標準,Broker也要有主節點和從節點機制。在主節點宕機的情況,從節點要能灰度過渡,不至於Broker主節點宕機,整個AvatarMQ消息系統陷入癱瘓狀態。
  • 消息應答失敗,還未支持重試功能。
  • 當然還有一些未知的bug,有待發現和修複。
  • AvatarMQ的處理性能,未經歷過生產系統實際檢驗,暫時無法保證其安全和可靠性。

  由於代碼編寫、測試等等工作,都是本人利用工作之餘的時間完成,時間點上比較倉促。加上本人的技術水平有限,難免有說的不對及寫得不好的地方,或者其中應該有更好的解決方案。歡迎廣大同行、愛好者線上下進行學習交流,有什麼寶貴的建議和觀點,懇請批評指正,不吝賜教。雖然AvatarMQ和業界主流、久經考驗的消息隊列系統,在處理性能、可靠性上,肯定還有不小的差距。但是可以基於此,加深對分散式消息隊列的理解,做到知其然知其所以然,何樂而不為?

  最後,本人後續會逐漸推出“基於Netty構建的分散式消息隊列系統(AvatarMQ)”,架構設計、原理分析的詳解連載文章,敬請期待!

  PS:目前AvatarMQ已經開源,整個項目托管到github,對應的網址為:https://github.com/tang-jie/AvatarMQ,歡迎有興趣的同行朋友、愛好者關註下載,如果覺得還不錯,可以點擊Star收藏、關註。當然,你還可以點擊推薦本文,也算是對我辛苦付出的一點支持和回報,謝謝大家!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Web框架本質 眾所周知,對於所有的Web應用,本質上其實就是一個socket服務端,用戶的瀏覽器其實就是一個socket客戶端。 上述通過socket來實現了其本質,而對於真實開發中的python web程式來說,一般會分為兩部分:伺服器程式和應用程式。伺服器程式負責對socket伺服器進行封裝, ...
  • 每一項技術用的人多了,就會有人將其進行優化,做成一個簡單、實用、大眾化的工具,這對於初識者來說是非常方便的,但是對於長久學習或工作這方面的人技術人員來說是不可取的,所以還是要學習基礎的實用方法。因此,我就在ubuntu下配置了Apache伺服器來更深入的學習。 這是一個預設安裝的方法,如果要指定 步 ...
  • 解決的問題 HBase的Write Ahead Log (WAL)提供了一種高併發、持久化的日誌保存與回放機制。每一個業務數據的寫入操作(PUT / DELETE)執行前,都會記賬在WAL中。 如果出現HBase伺服器宕機,則可以從WAL中回放執行之前沒有完成的操作。 本文主要探討HBase的WAL ...
  • 由於網站項目需要上傳和下載資源,資源大小有幾M到幾百M不等,傳輸的效率暫不考慮,所以先搭建FTP伺服器供項目使用。 下載Filezilla Server後安裝,沒特殊要求,下一步直至結束。 安裝完成後打開程式居然報錯: 錯誤提示有2個,第一個需要設置被動模式的路由,【Edit】->【Setting】 ...
  • 新建對話框應用程式,刪除自動生成的控制項後,拖拽一個CListCtrl控制項,綁定變數名為:m_listctrl。在對話框初始化成員函數OnInitDialog()中鍵入以下代碼即可實現自適應系統的CListCtrl控制項! 下麵是這兩種樣式的對比圖: ...
  • 這裡想說一下在集合框架前需要理解的小知識點,也是個人的膚淺理解,不知道理解的正不正確,請大家多多指教。這裡必須談一下java的泛型,因為它們聯繫緊密,我們先看一下這幾行代碼: 這裡主要想測試一下這兩個類是不是相等的,根據我之前的認識,這應該是不相等的,但是運行輸出的結果是: class java.u ...
  • 八、BPMN 2.0流程圖詳解 BPMN 2.0的標準的出現是好事,用戶不在被某個工作流開發商綁架或者在工作流中開發妥協,Activiti作為BPMN標準的一套解決方案,使得用戶在選擇工作流框架時可以平滑的遷移過渡。也有負面的不好的消息,就是BPMN標準是大量開會討論和開發商妥協的結果(一般這是在做 ...
  • 前言: 好久沒有寫博客了,上次發表博客還是在5月的時候,主要是4月多入職的新公司,5月份就開始做項目一直忙到這個月的中旬項目上線才偷得浮生半日閑,但是項目上線後客戶還是隔三差五的提個問題,不是改BUG就是添加新的東西,本來想著一邊看編程思想一邊把每章的讀後感寫出來發出來的,但是天天忙著加班雖然也抽時 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...