在通常情況下你在使用消息中間件的時候,都是未經設計的使用,你沒有把應用架構和系統架構邊界搞清楚。消息中間件只是一個純粹的技術工具,當你引入的時候是站在應用架構的角度引入的。這是架構的角度,也是架構的上帝視角,這樣你就不會用到最後發現越來越混亂,而且也無法結合軟體模式、方法論、最佳實踐來綜合提升系統... ...
- 面向EDA(事件驅動架構)的方式來設計你的消息
- AMQP routing key的設計
- RabbitMQ cluster搭建
- Mirror queue policy設置
- 兩個不錯的RabbitMQ plugin 大型應用插件(Sharding、Rederation)
- Queue鏡像失敗手動同步
- 各集群配置同步方式(RabbitMQ export\import)
- 客戶端連接方式(儘量採用AMQP組來動態鏈接)
- RabbitMQ 產線二次產品化封裝(消息補償、發送消息持久化、異常處理、監控頁面、重覆消息剔除)
1.面向EDA(事件驅動架構)的方式來設計你的消息
在通常情況下你在使用消息中間件的時候,都是未經設計的使用,你沒有把應用架構和系統架構邊界搞清楚。消息中間件只是一個純粹的技術工具,當你引入的時候是站在應用架構的角度引入的。這是架構的角度,也是架構的上帝視角,這樣你就不會用到最後發現越來越混亂,而且也無法結合軟體模式、方法論、最佳實踐來綜合提升系統的架構能力。
EDA(Event Driven Architecture,EDA) 事件驅動架構,它是一種用來在SOA或者Micro service中進行的架構模式。它的好處有幾個,柔性具有很高的伸縮性。
(具體參考本人的SOA架構文章:SOA架構設計經驗分享—架構、職責、數據一致性)
既然要EDA就要規劃好你當前的系統邊界之內有多少業務實體,這些實體是圍繞著領域模型而得來。所以這裡不要很主觀的就定義一些你認為的事件,這些事件要根據業務實體中的對象來設計。業務實體起碼是有唯一Identity的。比如,訂單、商品,圍繞著這些實體展開,訂單可能有幾個狀態是比較常用的,創建、支付、配送、取消。商品可能有價格、關鍵屬性修改等等。這些實體的抽象和提煉取決於你當前的業務。
(有關這方面內容可以參考:《領域驅動設計》、《探索CQRS和事件源》)
這些是相對理論的指導思想,有了這些之後你可以落地你的Rabbitmq,這樣你就不會跑偏了。比如,你的消息名稱不會是看起來沒結構和層次的,deliveryMssage(配送消息)。而是應該,order.delivery.ondeliveryEvented(訂單.配送.配送完成事件)這樣的結構。
當你的層次結構不滿足業務需求的時候,你可能還需要進一步明確事件範圍,order.viporder.delivery.ondeliveryEvented(訂單.VIP訂單.配送.配送完成事件)。
上圖是一個事件驅動的基本場景,它最矚目的幾個特性就是這幾個,首先是非同步化的,可以大大提高系統的抗峰值能力。然後就是解耦,這不用說了,設計模式里的觀察者模式沒有人不知道它的好處。伸縮性,可以按需scaleout,比如rabbitmq的node可以很方便的加入。最終一致性解決了分散式系統的CAP定理的問題。
2.AMQP routing key的設計
AMQP協議中約定了routing key的設計和交互。為了實現訂閱發佈功能,我們需要某種方式能夠訂閱自己所感興趣的事件。所以在AMQP中的Binding中,可以根據routing key來進行模式匹配。所以,這裡可以結合amqp routingkey與領域事件,發出來的事件就相當於amqp中的routingkey,這樣可以完美的結合起來。
你的事件肯定是隨著業務發展逐漸增加的,而這個事件集合也沒辦法在一開始就定義清楚,所以這裡有一個需要註意的就是,綁定的時候千萬不要寫死具體的routing key。比如,order.delivery.OnDeliveryEvented,這是訂單配送,此時你Binding的時候routingkey就寫成了”order.delivery.OnDeliveryEvented”。未來訂單事件一擴展,就會很麻煩,不相關的事件都被訂閱到,無法細化或者事件你無法獲取到,因為routingkey改變了。所以在綁定的時候記住具體點綁定,也就是藉助字元串的模式匹配綁定,比如,*.delivery.*,*.onDeliveryEvented”這樣。將來越來越多的routingkey和event出來都不會影響你的綁定。你只需要根據自己的關心程度,綁定在事件的不同層級上即可。
上圖中,orderBinding綁定了order事件,它訂閱了頂級事件,也就是說未來任何類型的訂單都可以被訂閱到,比如,order.normalorder.delivery.onDeliveryEvent也可以被訂閱到。而viporderBinding訂閱了viporder事件,如果發送了一個order.normalorder.delivery.onDeliveryEvent就跟它沒關係了。
3.RabbitMQ cluster搭建
搞清楚了應用架構的事情,我們開始著手搭建RabbitMQ cluster。rabbitmq這款AMQP產品是用erlang開發的,那麼我們稍微介紹下erlang。
我第一次正式接觸erlang就是從rabbitmq開始的,一開始並沒有太多感覺到特別的地方,後來才明白越明白越發現挺喜歡這門語言的。喜歡的理由就是,它是天然的分散式語言。這句話說起來好像挺平常的,但是當你明白了.erlang.cookie機制之後才恍然大悟。瞬間頓悟了,為什麼要用erlang來搞rabbitmq,而是它真的很適合信息交換之類的軟體。erlang是愛立信公司開發的專門用來開發高性能信息交換機的,想想也會覺得那些軟體的性能和穩定性要求是極高的。RabbitMQ的節點發現和互連真的很方便,這在erlang的虛擬機中就集成了,而且具有高度容錯能力。反正我對它很有好感。
還有一點值得驕傲的是RabbitMQ是偉大的pivotal公司的,你應該知道pivotal公司是乾什麼的,如果你還不清楚建議你立刻google下。
一開始我並沒有太關註他們的copyright,後來對pivotal公司越來越佩服之後突然看到原來RabbitMQ也是他們家的,突然信心倍增。這就是影響力和口碑,看看人家公司的spring、springboot、spring cloud,佩服的五體投地。(RabbtiMQ 官網:http://www.rabbitmq.com/)
3.1.安裝erlang & RabbitMQ
要想安裝RabbitMQ,首先需要安裝和配置好它的宿主環境erlang。去erlang官網下載好erlang otp_src源碼包,然後在本地執行源碼安裝。(erlang官網:http://www.erlang.org/)
由於我本機已經下載好了otp_src源碼包,我是使用的otp_src_19.1版本。下載好之後解壓縮,然後進入目錄,執行./configure --prefix=/usr/erlang/,進行環境的檢查和安裝路徑的選擇。如果你提示“No curses library functions found”錯誤,是因為缺少curses庫,yum install –y ncurses-devel。安裝後在進行configure。
如果沒有報錯的話,就說明安裝成功了。你還需要配置下環境變數:
export PATH=$PATH:/usr/erlang/bin
source /etc/profile
此時使用erl命令檢查下erlang是否能正常工作了。
接下來安裝RabbitMQ,去官網下載運行的包就行了。
同樣要配置下環境變數,這樣你的命令才能被系統查找到。然後運行rabbitmq實例。
這裡有一個需要註意,記得配置下hosts,在127.0.0.1裡加上本機的名稱。erlang進程需要host來進行連接,所以它會檢查你的hosts配置。還需要設置下防火牆,三個埠要打開。15672是管理界面用的,25672是集群之間使用的埠,4369是erlang進程epmd用來做node連接的。
我配置了兩個節點,192.168.0.105、192.168.0.107,現在已經全部就緒。我們添加原始賬號進入rabbitmq管理界面。
3.2.配置RabbitMQ cluster
先保證你的各個rabbitmq節點都是可以訪問的,且打開rabbitmq_management plugin,這樣可以當出現某個節點掛掉之後可以切換到其他管理界面查看情況或者管理。
打開管理界面插件:
rabbitmq-plugins enable rabbitmq_management
添加賬號:
rabbitmqctl add_user admin admin
添加 許可權tag
rabbitmqctl set_user_tags admin administrator
保證兩個節點都是可以正常工作的。下麵我們就將這兩個節點連接起來形成高可用的cluster,這樣我們就可以讓我們的exchange、queue在這兩個節點之間複製,形成高可用的queue。
cd 到你的home目錄下,我是在root下,裡面有一個隱藏的.erlang.cookie文件,這就是我在前面介紹erlang時候提到的,這個文件是erlang用來發現和互連的基礎。我們需要做的很簡單,將兩個節點中的.erlang.cookie設置成一樣的。這是erlang的約定,一樣的cookie hash key他認為是合法和正確的連接。
.erlang.cookie預設是只讀的,你需要修改下寫入許可權,然後複製粘貼下cookie 字元串即可。
chmod u+w .erlang.cookie
配置好了之後接下來配置hosts文件,erlang會使用hosts文件里的配置去發現節點。
vim /etc/hosts
192.168.0.107 rabbitmq_node2
192.168.0.105 rabbitmq_node1
保證同樣的配置在所有的節點上都是相同的。驗證你配置的正確不正確你只需要在你的機器上ping rabbitmq_node1,試下請求的ip是不是你配置的即可。按照DNS的請求原理,hosts是最高優先權,除非瀏覽器有緩存,你直接用ping就不會有問題的。
選擇一個節點stop,然後連接到另外節點。
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq_node2
Clustering node rabbit@rabbitmq_node1 with rabbit@rabbitmq_node2 ...
rabbitmqctl start_app
節點已經連接成功。
預設情況下節點占用的memory是總記憶體的40%,可以根據自己的用途仔細研究rabbitmq的配置項。為了提高性能,不需要兩個節點都是disc的節點,所以我們需要啟動一個節點為RAM模式。
rabbitmqctl change_cluster_node_type ram
改變rabbitmq_node1為記憶體節點模式。
4.Mirror queue policy設置
節點是準備好了,接下來我們需要設置exchange、queue 高可用策略,這樣才能真的做到高可用。現在是物理上的機器或者說虛擬機節點是高可用的,但是裡面的對象需要我們進行配置策略。
RabbitMQ支持很好的策略模式,需要管理員才能操作。
首先我們需要創建一個屬於自己業務範圍內的vhost,標示一個邏輯上的獨立空間,所有的賬號、策略、隊列都是強制在某個虛擬機里的。我創建了一個common vhost。
開始添加policie。
最主要是Apply to ,可以作用在exchange或者queues上,當然也可以包含這兩個。策略選擇還是比較豐富的,最常用的是HAmode,還有MessageTTL(消息的過期時間)。這些策略按照幾個維度分組了,有跟高可用相關的,有Federation(集群之間同步消息)相關的 ,有Queue相關的,還有Exchange相關的。可以根據的業務場景進行調整。
我們定義了策略的匹配模式.order.,這樣可以避免將所有的exchange、queue都鏡像了。
我們新建了一個ex.order.topic exchange,它的features中應用了exchange_queue_ha策略。(相同的策略是無法疊加使用的。)其他的exchange並沒有應用這個策略,是因為我們的pattern限定了只匹配.order.的名稱。
創建一個qu.order.crm queue,註意看它的node屬性里有一個”Synchronised mirrors:rabbit@rabbitmq_node2“鏡像複製。features里也應用了exchange_queue_ha策略。這個時候,隊列其實在兩個節點里都是有的,雖然我們創建的時候是在rabbit@rabbitmq_node1里的,但是它會複製到集群里的其他節點。在創建HAmode的時候可以提供HA params參數,來限定複製節點的個數,這通常用來提高性能和HA之間的平衡。
5.兩個不錯的RabbitMQ plugin 大型應用插件(Sharding、Rederation)
在rabbitmq-plugins中有兩個plugin還是可以試著研究研究的。rabbitmq-plugins list。
rabbitmq-plugins list
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@rabbitmq_node1
|/
[e*] amqp_client 3.6.5
[ ] cowboy 1.0.3
[ ] cowlib 1.0.1
[e*] mochiweb 2.13.1
[ ] rabbitmq_amqp1_0 3.6.5
[ ] rabbitmq_auth_backend_ldap 3.6.5
[ ] rabbitmq_auth_mechanism_ssl 3.6.5
[ ] rabbitmq_consistent_hash_exchange 3.6.5
[ ] rabbitmq_event_exchange 3.6.5
[ ] rabbitmq_federation 3.6.5
[ ] rabbitmq_federation_management 3.6.5
[ ] rabbitmq_jms_topic_exchange 3.6.5
[E*] rabbitmq_management 3.6.5
[e*] rabbitmq_management_agent 3.6.5
[ ] rabbitmq_management_visualiser 3.6.5
[ ] rabbitmq_mqtt 3.6.5
[ ] rabbitmq_recent_history_exchange 1.2.1
[ ] rabbitmq_sharding 0.1.0
[ ] rabbitmq_shovel 3.6.5
[ ] rabbitmq_shovel_management 3.6.5
[ ] rabbitmq_stomp 3.6.5
[ ] rabbitmq_top 3.6.5
[ ] rabbitmq_tracing 3.6.5
[ ] rabbitmq_trust_store 3.6.5
[e*] rabbitmq_web_dispatch 3.6.5
[ ] rabbitmq_web_stomp 3.6.5
[ ] rabbitmq_web_stomp_examples 3.6.5
[ ] sockjs 0.3.4
[e*] webmachine 1.10.3
rabbitmq_sharding、rabbitmq_federation,rabbitmq_sharding的版本有點低了,github地址:https://github.com/rabbitmq/rabbitmq-sharding
Rederation 可以用來進行跨cluster或者node之間同步消息。http://www.rabbitmq.com/federated-exchanges.html
這個用來在不同的domain之間傳遞消息還是個不錯的解決方案,跨機房或者跨網路區域,訂閱別人的rabbitmq消息始終不太穩定,可以用這種方式來傳遞消息。
6.Queue鏡像失敗手動同步
有時候可能由於各種原因導致queue mirror失敗,這個時候可以手動進行同步,而不是像其他分散式系統來重啟節點或者重建數據。
這個還是比較方便的,有時候總有那麼幾個小問題需要你手動處理的。
7.各集群配置同步方式(RabbitMQ export\import)
各個環境的集群配置同步也是個日常運維的問題,還好RabbitMQ也提供了相關工具。
8.客戶端連接方式(儘量採用AMQP組來動態鏈接)
由於RabbitMQ是AMQP協議的實現,所以在進行遠程連接的時候儘量採用amqp協議的方式連接。
var amqpList = new List<AmqpTcpEndpoint>
{
new AmqpTcpEndpoint(new Uri("amqp://192.168.0.105:5672")),
new AmqpTcpEndpoint(new Uri("amqp://192.168.0.107:5672"))
};
關於集群的vip方案其實也是需要綜合考慮的,如果是統一的地址會面臨三個問題,DNS、LoadBalance、VIP,這三個點都有可能導致集群連接不上。現在越來越多的方案傾向於在客戶端做負載和故障轉移,這有很多好處,消除了中間節點帶來的故障概率。如果這三個點加在一起出現的可用性指標肯定是比直接在客戶端連接的低的多。
我們碰到最多就是VIP的問題,這類系統的VIP不同於資料庫,資料庫的master\slave大多都是要人工check後才切換,不會隨便自動的切換主從庫。而非資料庫的VIP大多都是Keepalived自動檢測切換,這帶來一些列問題,包括連接重試、心跳保持。這隻是VIP的出錯場景之一。還有LoadBalance帶來的問題,DNS出錯的可能性也是很大。所以我傾向於使用客戶端來做這些。
有幾個地方很重要,第一個就是消息的Persistent持久化狀態要帶上,第二個就是ContentType,這個屬性很實用,方便你查看消息的正文。
如果沒設置,預設是null。
第三個就是AutomaticRecoveryEnabled,自動連接重試,這致命重要。當上面的VIP切換之後這個可以保命。第四個就是TopologyRecoveryEnabled,重新恢復Exchange、queue、binding。在出現網路斷開之後,一旦恢復連接就會恢復這些設置以保證是最新的設置。
9.RabbitMQ 產線二次產品化封裝(消息補償、發送消息持久化、異常處理、監控頁面、重覆消息剔除)
不管rabbitmq保證的多麼強壯,多麼高可用,記住一定要有備用方案。
在之前我寫了一篇文章,WebAPi的可視化輸出模式(RabbitMQ、消息補償相關)——所有webapi似乎都缺失的一個功能
說了就是消息的持久化和補償。
一旦將發送和接受的消息持久化之後我們能做到事情就比較多了。消息補償是可以做的,異常也不用擔心。但是在發送消息的時候一定要註意,是先持久化消息在業務邏輯處理。為了應對特殊活動的監控,還可以開發一定的業務來監控消息的接受和處理的數量,然後自動補償。
在開發補償程式的時候有一個邏輯挺饒人的,當你對某一個消息進行補償的時候會多出發送消息,而接受的消息肯定是比你發送的少。所以你在統計的時候記得DISTINCT下。
相關文章:
封裝RabbitMQ.NET Library 的一點經驗總結
WebAPi的可視化輸出模式(RabbitMQ、消息補償相關)——所有webapi似乎都缺失的一個功能
作者:王清培
出處:http://www.cnblogs.com/wangiqngpei557/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面