目前業界流行的分散式消息隊列系統(或者可以叫做消息中間件)種類繁多,比如,基於Erlang的RabbitMQ、基於Java的ActiveMQ/Apache Kafka、基於C/C++的ZeroMQ等等,都能進行大批量的消息路由轉發。它們的共同特點是,都有一個消息中轉路由節點,按照消息隊列裡面的專業術 ...
目前業界流行的分散式消息隊列系統(或者可以叫做消息中間件)種類繁多,比如,基於Erlang的RabbitMQ、基於Java的ActiveMQ/Apache Kafka、基於C/C++的ZeroMQ等等,都能進行大批量的消息路由轉發。它們的共同特點是,都有一個消息中轉路由節點,按照消息隊列裡面的專業術語,這個角色應該是broker。整個消息系統通過這個broker節點,進行從消息生產者Producer到消費者Consumer的消息路由。當然了,生產者和消費者可以是多對多的關係。消息路由的時候,可以根據關鍵字(專業的術語叫topic),進行關鍵字精確匹配、模糊匹配、廣播方式的消息路由。
簡單來說,一個極簡的分散式消息隊列系統主要的構成模塊有:
Broker:簡單來說就是消息隊列伺服器實體。
Producer:消息的生產者,主要用來發送消息給消費者。
Consumer:消息的消費者,主要用來接收生產者的消息。
Routing Key:路由關鍵字(Topic),主要用來控制生產者和消費者之間的發送與接收消息的對應關係。
Channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
到此為止,我們明白了一個分散式消息隊列系統的主要構成模塊,現在本人就通過Netty,這個優秀的Java NIO網路通訊框架,構建一個支持上述應用場景的分散式消息隊列系統,本人把其命名為AvatarMQ。後續我會基於這個開源項目,連載出基於Netty構建分散式消息隊列系統系列相關的文章,闡明主要的設計思路、組織結構、模塊劃分依據、類圖結構等等。為了說明方便,後續本文中,如果沒有特殊說明,有涉及基於Netty構建的分散式消息隊列系統,就是指代AvatarMQ。由於整個開源項目涉及的代碼量比較多,所以希望大家在本人編寫系列博客文章的基礎上,耐心地理解、分析其中的代碼模塊,相信一定不會讓您失望!
AvatarMQ基於Netty,所以首先,你要能清楚的理解Netty是什麼?它能做什麼?有興趣的朋友可以關註一下Netty項目的官網(http://netty.io/),上面有很詳細的入門文章介紹。雖然都是英文的,但是這些一手的資料更具權威性,值得花時間深入研究探索,畢竟現在流行的雲計算、大數據領域成功的開源項目比如Hadoop、Storm等等,網路通信層這塊全部依賴Netty,可見Netty的功能強大。
基於Netty可以開發定製高性能、高可靠性的Java企業級服務端應用,而本文是我,在繼利用Netty構建高性能RPC伺服器系列文章之後,又一個基於Netty開發的分散式消息隊列系統(AvatarMQ)。此外AvatarMQ還大量使用了Java多線程的相關類庫。所以希望在此之前,大家能回憶複習一下,這樣理解起來會更加得心應手、事半功倍。
AvatarMQ是基於Netty構建的分散式消息隊列系統,支持多個生產者和多個消費者之間的消息路由、傳遞。主要特性如下:
- AvatarMQ基於Java語言進行編寫,網路通訊依賴Netty。
- 生產者和消費者的關係可以是一對多、多對一、多對多的關係。
- 若幹個消費者可以組成消費者集群,生產者可以向這個消費者集群投遞消息。
- 消費者集群對於有共同關註點的消費者支持消息的負載均衡策略。
- 支持動態新增、刪除生產者、消費者。
- 目前僅僅支持關鍵字的精確匹配路由,後續會逐漸完善。
- 消息隊列伺服器Broker基於Netty的主從事件線程池模型開發設計。
- 網路消息序列化採用Kryo進行消息的網路序列化傳輸。
- Broker的消息派發、負載均衡、應答處理(ACK)基於非同步多線程模型進行開發設計。
- Broker消息的投遞,目前支持嚴格的消息順序。其中Broker還支持消息的緩衝派發,即Broker會緩存一定數量的消息之後,再批量分配給對此消息感興趣的消費者。
AvatarMQ項目開源網址:https://github.com/tang-jie/AvatarMQ
整個開源項目依賴的jar包請參考:https://github.com/tang-jie/AvatarMQ/blob/master/nbproject/project.properties
另外,值得註意的是:
AvatarMQ使用的Netty是基於4.0版本(下載地址:http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2)。
消息序列化使用的Kryo是基於kryo-3.0.3版本(下載地址:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3)。
請大家自行去官網下載使用。
現在,現在言歸正傳,我們先來看下整合AvatarMQ項目的軟體架構圖:
從上述圖例中,我們可以很清楚的看到:生產者和消費者之間是通過Broker進行消息的路由和轉發,同時Broker還負責應答生產者和接收消費者的處理應答。
在瞭解了,整個AvatarMQ的組織架構之後,我們再來實際運行一下AvatarMQ!
首先,先啟動一下Broker伺服器(對應代碼:https://github.com/tang-jie/AvatarMQ/blob/master/src/com/newlandframework/avatarmq/spring/AvatarMQServerStartup.java)
如果一切正常,終端控制台會列印如下輸出:
接著,我們就來實際驗證一下AvatarMQ的消息推送功能。
1、生產者發送1條消息給關註這條消息的消費者。我們先啟動消費者,再啟動生產者。
其中消費者1的測試代碼(AvatarMQConsumer1.java)如下所示:
package com.newlandframework.avatarmq.test; import com.newlandframework.avatarmq.consumer.AvatarMQConsumer; import com.newlandframework.avatarmq.consumer.ProducerMessageHook; import com.newlandframework.avatarmq.msg.ConsumerAckMessage; import com.newlandframework.avatarmq.msg.Message; /** * @filename:AvatarMQConsumer1.java * @description:AvatarMQConsumer1功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class AvatarMQConsumer1 { private static ProducerMessageHook hook = new ProducerMessageHook() { public ConsumerAckMessage hookMessage(Message message) { System.out.printf("AvatarMQConsumer1 收到消息編號:%s,消息內容:%s\n", message.getMsgId(), new String(message.getBody())); ConsumerAckMessage result = new ConsumerAckMessage(); result.setStatus(ConsumerAckMessage.SUCCESS); return result; } }; public static void main(String[] args) { AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-1", hook); consumer.init(); consumer.setClusterId("AvatarMQCluster"); consumer.receiveMode(); consumer.start(); } }
生產者1的測試代碼(AvatarMQProducer1.java)如下所示,其含義是發送1條消息,給關註“AvatarMQ-Topic-1”主題的消費者:
package com.newlandframework.avatarmq.test; import com.newlandframework.avatarmq.msg.Message; import com.newlandframework.avatarmq.msg.ProducerAckMessage; import com.newlandframework.avatarmq.producer.AvatarMQProducer; import org.apache.commons.lang3.StringUtils; /** * @filename:AvatarMQProducer1.java * @description:AvatarMQProducer1功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class AvatarMQProducer1 { public static void main(String[] args) throws InterruptedException { AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-1"); producer.setClusterId("AvatarMQCluster"); producer.init(); producer.start(); System.out.println(StringUtils.center("AvatarMQProducer1 消息發送開始", 50, "*")); for (int i = 0; i < 1; i++) { Message message = new Message(); String str = "Hello AvatarMQ From Producer1[" + i + "]"; message.setBody(str.getBytes()); ProducerAckMessage result = producer.delivery(message); if (result.getStatus() == (ProducerAckMessage.SUCCESS)) { System.out.printf("AvatarMQProducer1 發送消息編號:%s\n", result.getMsgId()); } Thread.sleep(100); } producer.shutdown(); System.out.println(StringUtils.center("AvatarMQProducer1 消息發送完畢", 50, "*")); } }
首先我們先來啟動消費者,如果一切正常,控制台輸出結果為:
這個時候我們再運行生產者,發送一條消息給消費者。啟動生產者之後,控制台輸出結果如下:
那現在,我們切回去看下消費者是否收到生產者的消息了呢?
非常正確,我們的消費者果然收到了生產者發送過來的消息。
2、生產者發送1條消息給不關註這條消息的消費者。
首先說明的是,代碼樣例還是基於上述的AvatarMQConsumer1.java、AvatarMQProducer1.java。只不過這次是生產者發送的主題改成:“AvatarMQ-Topic-Test”,消費者關註的主題改成“AvatarMQ-Topic-1”。然後依次啟動消費者、生產者。下麵是實際的運行情況:
生產者成功發送消息:
那按照要求,消費者應該無法收到生產者的這條消息,實際情況是不是這樣呢?事實勝於雄辯,看如下截圖所示:
消費者依然處理啟動監聽狀態,說明完全符合我們的預期。
3、生產者發送N條消息(這裡是發送100條消息)給一個消費者集群(有2個消費者組成,並且這2個消費者關註的消息主題topic是相同的)。
我們先啟動2個消費者,再啟動生產者。消費者代碼參考:
package com.newlandframework.avatarmq.test; import com.newlandframework.avatarmq.consumer.AvatarMQConsumer; import com.newlandframework.avatarmq.consumer.ProducerMessageHook; import com.newlandframework.avatarmq.msg.ConsumerAckMessage; import com.newlandframework.avatarmq.msg.Message; /** * @filename:AvatarMQConsumer2.java * @description:AvatarMQConsumer2功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class AvatarMQConsumer2 { private static ProducerMessageHook hook = new ProducerMessageHook() { public ConsumerAckMessage hookMessage(Message message) { System.out.printf("AvatarMQConsumer2 收到消息編號:%s,消息內容:%s\n", message.getMsgId(), new String(message.getBody())); ConsumerAckMessage result = new ConsumerAckMessage(); result.setStatus(ConsumerAckMessage.SUCCESS); return result; } }; public static void main(String[] args) { AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-2", hook); consumer.init(); consumer.setClusterId("AvatarMQCluster2"); consumer.receiveMode(); consumer.start(); } }
生產者代碼參考(目的是發送100條消息)給消費者集群。
package com.newlandframework.avatarmq.test; import com.newlandframework.avatarmq.msg.Message; import com.newlandframework.avatarmq.msg.ProducerAckMessage; import com.newlandframework.avatarmq.producer.AvatarMQProducer; import org.apache.commons.lang3.StringUtils; /** * @filename:AvatarMQProducer2.java * @description:AvatarMQProducer2功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class AvatarMQProducer2 { public static void main(String[] args) throws InterruptedException { AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-2"); producer.setClusterId("AvatarMQCluster2"); producer.init(); producer.start(); System.out.println(StringUtils.center("AvatarMQProducer2 消息發送開始", 50, "*")); for (int i = 0; i < 100; i++) { Message message = new Message(); String str = "Hello AvatarMQ From Producer2[" + i + "]"; message.setBody(str.getBytes()); ProducerAckMessage result = producer.delivery(message); if (result.getStatus() == (ProducerAckMessage.SUCCESS)) { System.out.printf("AvatarMQProducer2 發送消息編號:%s\n", result.getMsgId()); } Thread.sleep(100); } producer.shutdown(); System.out.println(StringUtils.center("AvatarMQProducer2 消息發送完畢", 50, "*")); } }
我們依次啟動消費者AvatarMQConsumer2兩次,這個時候終端控制台依次輸出:
這個時候我們再啟動生產者,運行截圖如下:
說明生產者發送了100條消息出去,看下我們消費者1接收的情況:
繼續看下我們的消費者2,消息接收的情況,截圖如下:
最終統計一下,消費者1,接收的消息編號都是奇數,一共50個。消費者2,接收到的消息編號都是偶數,一共50個。兩個消費者接收的消息總數加起來,剛好等於生產者發送的消息總數100個,完全符合我們的預期!另外消費者1、消費者2都收到了來自生產者的消息,說明Broker進行了消息的路由傳遞。
4、多個生產者和多個消費者的消息傳遞,以及動態新增、刪除生產者、消費者。
這個就交給大家自行測試了,由於篇幅有限,在此本人就不一一闡述。
到目前為止,相信大家對於AvatarMQ所具備的基本功能,有了一個大致的印象。當然,AvatarMQ還有一些美中不足,比如:
- 不支持消息的刷盤存儲,可能由於系統Crash,造成消息的丟失。後續需要接入一個存儲系統(基於Java NIO),保證消息的持久序列化。
- AvatarMQ的生產者、消費者模塊,要進一步支持,斷網重連Broker的功能,確保在Broker重啟的情況下,把在途的消息繼續發送、接收完畢。
- Broker單點的問題,根據高可用性集群HA(High Available)的標準,Broker也要有主節點和從節點機制。在主節點宕機的情況,從節點要能灰度過渡,不至於Broker主節點宕機,整個AvatarMQ消息系統陷入癱瘓狀態。
- 消息應答失敗,還未支持重試功能。
- 當然還有一些未知的bug,有待發現和修複。
- AvatarMQ的處理性能,未經歷過生產系統實際檢驗,暫時無法保證其安全和可靠性。
由於代碼編寫、測試等等工作,都是本人利用工作之餘的時間完成,時間點上比較倉促。加上本人的技術水平有限,難免有說的不對及寫得不好的地方,或者其中應該有更好的解決方案。歡迎廣大同行、愛好者線上下進行學習交流,有什麼寶貴的建議和觀點,懇請批評指正,不吝賜教。雖然AvatarMQ和業界主流、久經考驗的消息隊列系統,在處理性能、可靠性上,肯定還有不小的差距。但是可以基於此,加深對分散式消息隊列的理解,做到知其然知其所以然,何樂而不為?
最後,本人後續會逐漸推出“基於Netty構建的分散式消息隊列系統(AvatarMQ)”,架構設計、原理分析的詳解連載文章,敬請期待!
PS:目前AvatarMQ已經開源,整個項目托管到github,對應的網址為:https://github.com/tang-jie/AvatarMQ,歡迎有興趣的同行朋友、愛好者關註下載,如果覺得還不錯,可以點擊Star收藏、關註。當然,你還可以點擊推薦本文,也算是對我辛苦付出的一點支持和回報,謝謝大家!