為什麼會需要消息隊列(MQ)? ########################################################################################## 主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的i ...
為什麼會需要消息隊列(MQ)?
##########################################################################################
主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MySQL,直接導致無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。通過使用消息隊列,我們可以非同步處理請求,從而緩解系統的壓力。
##########################################################################################
美國電腦科學家,LaTex的作者Leslie Lamport說:“分散式系統就是這樣一個系統,系統中一個你甚至都不知道的電腦出了故障,卻可能導致你自己的電腦不可用。”一語道破了開發分散式系統的玄機,那就是它的複雜與不可控。所以Martin Fowler強調:分散式調用的第一原則就是不要分散式。這句話看似頗具哲理,然而就企業應用系統而言,只要整個系統在不停地演化,並有多個子系統共同存在時,這條原則就會被迫打破。蓋因為在當今的企業應用系統中,很難尋找到完全不需要分散式調用的場景。Martin Fowler提出的這條原則,一方面是希望設計者能夠審慎地對待分散式調用,另一方面卻也是分散式系統自身存在的缺陷所致。無論是CORBA,還是EJB 2;無論是RPC平臺,還是Web Service,都因為駐留在不同進程空間的分散式組件,而引入額外的複雜度,並可能對系統的效率、可靠性、可預測性等諸多方面帶來負面的影響。
然而,不可否認的是在企業應用系統領域,我們總是會面對不同系統之間的通信、集成與整合,尤其當面臨異構系統時,這種分散式的調用與通信變得越重要,它在架構設計中就更加凸顯其價值。並且,從業務分析與架構質量的角度來講,我們也希望在系統架構中儘可能地形成對服務的重用,通過獨立運行在進程中服務的形式,徹底解除客戶端與服務端的耦合。這常常是架構演化的必然道路。在我的同事陳金洲發表在InfoQ上的文章《架構腐化之謎》中,就認為可以通過“將獨立的模塊放入獨立的進程”來解決架構因為代碼規模變大而腐化的問題。
隨著網路基礎設施的逐步成熟,從RPC進化到Web Service,併在業界開始普遍推行SOA,再到後來的RESTful平臺以及雲計算中的PaaS與SaaS概念的推廣,分散式架構在企業應用中開始呈現出不同的風貌,然而殊途同歸,這些分散式架構的目標仍然是希望回到建造巴別塔的時代,系統之間的交流不再為不同語言與平臺的隔閡而產生障礙。正如Martin Fowler在《企業集成模式》一書的序中寫道:“集成之所以重要是因為相互獨立的應用是沒有生命力的。我們需要一種技術能將在設計時並未考慮互操作的應用集成起來,打破它們之間的隔閡,獲得比單個應用更多的效益”。這或許是分散式架構存在的主要意義。
1、集成模式中的消息模式
歸根結底,企業應用系統就是對數據的處理,而對於一個擁有多個子系統的企業應用系統而言,它的基礎支撐無疑就是對消息的處理。與對象不同,消息本質上是一種數據結構(當然,對象也可以看做是一種特殊的消息),它包含消費者與服務雙方都能識別的數據,這些數據需要在不同的進程(機器)之間進行傳遞,並可能會被多個完全不同的客戶端消費。在眾多分散式技術中,消息傳遞相較文件傳遞與遠程過程調用(RPC)而言,似乎更勝一籌,因為它具有更好的平臺無關性,並能夠很好地支持併發與非同步調用。對於Web Service與RESTful而言,則可以看做是消息傳遞技術的一種衍生或封裝。在《面向模式的軟體架構(捲四)》一書中,將關於消息傳遞的模式劃歸為分散式基礎設施的範疇,這是因為諸多消息中間件產品的出現,使得原來需要開發人員自己實現的功能,已經可以直接重用。這極大地降低了包括設計成本、實現成本在內的開發成本。因此,對於架構師的要求也就從原來的設計實現,轉變為對業務場景和功能需求的判斷,從而能夠正確地進行架構決策、技術選型與模式運用。
常用的消息模式
在我參與過的所有企業應用系統中,無一例外地都採用(或在某些子系統與模塊中部分採用)了基於消息的分散式架構。但是不同之處在於,讓我們做出架構決策的證據卻迥然而異,這也直接影響我們所要應用的消息模式。
消息通道(Message Channel)模式
我們常常運用的消息模式是Message Channel(消息通道)模式,如圖1所示。
圖1 Message Channel模式(圖片來自eaipatterns )
消息通道作為在客戶端(消費者,Consumer)與服務(生產者,Producer)之間引入的間接層,可以有效地解除二者之間的耦合。只要實現規定雙方需要通信的消息格式,以及處理消息的機制與時機,就可以做到消費者對生產者的“無知”。事實上,該模式可以支持多個生產者與消費者。例如,我們可以讓多個生產者向消息通道發送消息,因為消費者對生產者的無知性,它不必考慮究竟是哪個生產者發來的消息。
雖然消息通道解除了生產者與消費者之間的耦合,使得我們可以任意地對生產者與消費者進行擴展,但它又同時引入了各自對消息通道的依賴,因為它們必須知道通道資源的位置。要解除這種對通道的依賴,可以考慮引入Lookup服務來查找該通道資源。例如,在JMS中就可以通過JNDI來獲取消息通道Queue。若要做到充分的靈活性,可以將與通道相關的信息存儲到配置文件中,Lookup服務首先通過讀取配置文件來獲得通道。
消息通道通常以隊列的形式存在,這種先進先出的數據結構無疑最為適合這種處理消息的場景。微軟的MSMQ、IBM MQ、JBoss MQ以及開源的RabbitMQ、Apache ActiveMQ都通過隊列實現了Message Channel模式。因此,在選擇運用Message Channel模式時,更多地是要從質量屬性的層面對各種實現了該模式的產品進行全方位的分析與權衡。例如,消息通道對併發的支持以及在性能上的表現;消息通道是否充分地考慮了錯誤處理;對消息安全的支持;以及關於消息持久化、災備(fail over)與集群等方面的支持。因為通道傳遞的消息往往是一些重要的業務數據,一旦通道成為故障點或安全性的突破點,對系統就會造成災難性的影響。在本文的第二部分,我將給出一個實際案例來闡釋在進行架構決策時應該考慮的架構因素,並由此做出正確地決策。
發佈者-訂閱者(Publisher-Subscriber)模式
一旦消息通道需要支持多個消費者時,就可能面臨兩種模型的選擇:拉模型與推模型。拉模型是由消息的消費者發起的,主動權把握在消費者手中,它會根據自己的情況對生產者發起調用。如圖2所示:
圖2 拉模型
拉模型的另一種體現則由生產者在狀態發生變更時,通知消費者其狀態發生了改變。但得到通知的消費者卻會以回調方式,通過調用傳遞過來的消費者對象獲取更多細節消息。
在基於消息的分散式系統中,拉模型的消費者通常以Batch Job的形式,根據事先設定的時間間隔,定期偵聽通道的情況。一旦發現有消息傳遞進來,就會轉而將消息傳遞給真正的處理器(也可以看做是消費者)處理消息,執行相關的業務。在本文第二部分介紹的醫療衛生系統,正是通過引入Quartz.NET實現了Batch Job,完成對消息通道中消息的處理。
推模型的主動權常常掌握在生產者手中,消費者被動地等待生產者發出的通知,這就要求生產者必須瞭解消費者的相關信息。如圖3所示:
圖3 推模型
對於推模型而言,消費者無需瞭解生產者。在生產者通知消費者時,傳遞的往往是消息(或事件),而非生產者自身。同時,生產者還可以根據不同的情況,註冊不同的消費者,又或者在封裝的通知邏輯中,根據不同的狀態變化,通知不同的消費者。
兩種模型各有優勢。拉模型的好處在於可以進一步解除消費者對通道的依賴,通過後臺任務去定期訪問消息通道。壞處是需要引入一個單獨的服務進程,以Schedule形式執行。而對於推模型而言,消息通道事實上會作為消費者觀察的主體,一旦發現消息進入,就會通知消費者執行對消息的處理。無論推模型,拉模型,對於消息對象而言,都可能採用類似Observer模式的機制,實現消費者對生產者的訂閱,因此這種機制通常又被稱為Publisher-Subscriber模式,如圖4所示:
圖4 Publisher-Subscriber模式(圖片來自eaipatterns )
通常情況下,發佈者和訂閱者都會被註冊到用於傳播變更的基礎設施(即消息通道)上。發佈者會主動地瞭解消息通道,使其能夠將消息發送到通道中;消息通道一旦接收到消息,會主動地調用註冊在通道中的訂閱者,進而完成對消息內容的消費。
對於訂閱者而言,有兩種處理消息的方式。一種是廣播機制,這時消息通道中的消息在出列的同時,還需要複製消息對象,將消息傳遞給多個訂閱者。例如,有多個子系統都需要獲取從CRM系統傳來的客戶信息,並根據傳遞過來的客戶信息,進行相應的處理。此時的消息通道又被稱為Propagation通道。另一種方式則屬於搶占機制,它遵循同步方式,在同一時間只能有一個訂閱者能夠處理該消息。實現Publisher-Subscriber模式的消息通道會選擇當前空閑的唯一訂閱者,並將消息出列,並傳遞給訂閱者的消息處理方法。
目前,有許多消息中間件都能夠很好地支持Publisher-Subscriber模式,例如JMS介面規約中對於Topic對象提供的MessagePublisher與MessageSubscriber介面。RabbitMQ也提供了自己對該模式的實現。微軟的MSMQ雖然引入了事件機制,可以在隊列收到消息時觸發事件,通知訂閱者。但它並非嚴格意義上的Publisher-Subscriber模式實現。由微軟MVP Udi Dahan作為主要貢獻者的NServiceBus,則對MSMQ以及WCF做了進一層包裝,並能夠很好地實現這一模式。
消息路由(Message Router)模式
無論是Message Channel模式,還是Publisher-Subscriber模式,隊列在其中都扮演了舉足輕重的角色。然而,在企業應用系統中,當系統變得越來越複雜時,對性能的要求也會越來越高,此時對於系統而言,可能就需要支持同時部署多個隊列,並可能要求分散式部署不同的隊列。這些隊列可以根據定義接收不同的消息,例如訂單處理的消息,日誌信息,查詢任務消息等。這時,對於消息的生產者和消費者而言,並不適宜承擔決定消息傳遞路徑的職責。事實上,根據S單一職責原則,這種職責分配也是不合理的,它既不利於業務邏輯的重用,也會造成生產者、消費者與消息隊列之間的耦合,從而影響系統的擴展。
既然這三種對象(組件)都不宜承擔這樣的職責,就有必要引入一個新的對象專門負責傳遞路徑選擇的功能,這就是所謂的Message Router模式,如圖5所示:
圖5 Message Router模式(圖片來自eaipatterns )
通過消息路由,我們可以配置路由規則指定消息傳遞的路徑,以及指定具體的消費者消費對應的生產者。例如指定路由的關鍵字,並由它來綁定具體的隊列與指定的生產者(或消費者)。路由的支持提供了消息傳遞與處理的靈活性,也有利於提高整個系統的消息處理能力。同時,路由對象有效地封裝了尋找與匹配消息路徑的邏輯,就好似一個調停者(Meditator),負責協調消息、隊列與路徑定址之間關係。
除了以上的模式之外,Messaging模式提供了一個通信基礎架構,使得我們可以將獨立開發的服務整合到一個完整的系統中。 Message Translator模式則完成對消息的解析,使得不同的消息通道能夠接收和識別不同格式的消息。而且通過引入這樣的對象,也能夠很好地避免出現盤根錯節,彼此依賴的多個服務。Message Bus模式可以為企業提供一個面向服務的體系架構。它可以完成對消息的傳遞,對服務的適配與協調管理,並要求這些服務以統一的方式完成協作。
2、消息模式的應用場景
基於消息的分散式架構總是圍繞著消息來做文章。例如可以將消息封裝為對象,或者指定消息的規範例如SOAP,或者對實體對象的序列化與反序列化。這些方式的目的只有一個,就是將消息設計為生產者和消費者都能夠明白的格式,並能通過消息通道進行傳遞。
場景一:基於消息的統一服務架構
在製造工業的CIMS系統中,我們嘗試將各種業務以服務的形式公開給客戶端的調用者,例如定義這樣的介面:
public interface IService { IMessage Execute(IMessage aMessage); void SendRequest(IMessage aMessage); }
之所以能夠設計這樣的服務,原因在於我們對業務信息進行了高度的抽象,以消息的形式在服務之間傳遞。此時的消息其實是生產者與消費者之間的契約或介面,只要遵循這樣的契約,按照規定的格式對消息進行轉換與抽取,就能很好地支持系統的分散式處理。
在這個CIMS系統中,我們將消息劃分為ID,Name和Body,通過定義如下的介面方法,可以獲得消息主體的相關屬性:
public interface IMessage:ICloneable { string MessageID { get; set; } string MessageName() { get; set; } IMessageItemSequence CreateMessageBody(); IMessageItemSequence GetMessageBody(); }
消息主體類Message實現了IMessage介面。在該類中,消息體Body為IMessageItemSequence類型。這個類型用於獲取和設置消息的內容:Value和Item:
public interface IItemValueSetting { string getSubValue(string name); void setSubValue(string name, string value); } public interface IMessageItemSequence:IItemValueSetting, ICloneable { IMessageItem GetMessageItem(string aName); IMessageItem CreateMessageItem(string aName); }
Value為字元串類型,它利用了HashTable存儲Key和Value的鍵值對。Item則為IMessageItem類型,在IMessageItemSequence的實現類中,同樣利用了HashTable存儲Key和Item的鍵值對。
IMessageItem支持消息體的嵌套。它包含了兩部分:SubValue和SubItem。實現的方式和IMessageItemSequence相似。通過定義這樣的嵌套結構,使得消息的擴展成為可能。一般的消息結構如下所示:
IMessage——Name ——ID ——Body(IMessageItemSequence) ——Value ——Item(IMessageItem) ——SubValue ——SubItem(IMessageItem) ——……
各個消息對象之間的關係如圖6所示:
圖6 消息對象之間的關係
在實現服務進程通信之前,我們必須定義好各個服務或各個業務的消息格式。通過消息體的方法在服務的一端設置消息的值,然後發送,併在服務的另一端獲得這些值。例如發送消息端定義如下的消息體:
IMessageFactory factory = new MessageFactory(); IMessage message = factory.CreateMessage(); message.SetMessageName("service1"); IMessageItemSequence body = message.CreateMessageBody(); body.SetSubValue("subname1","subvalue1"); body.SetSubValue("subname2","subvalue2"); IMessageItem item1 = body.CreateMessageItem(”item1”); item1.SetSubValue("subsubname11","subsubvalue11"); item1.SetSubValue("subsubname12","subsubvalue12"); //Send Request Message MyServiceClient service = new MyServiceClient("Client"); service.SendRequest(message);
我們在客戶端引入了一個ServiceLocator對象,它通過MessageQueueListener對消息隊列進行偵聽,一旦接收到消息,就獲取該消息中的name去定位它所對應的服務,然後調用服務的Execute(aMessage)方法,執行相關的業務。
ServiceLocator承擔的定位職責其實是對存儲在ServiceContainer容器中的服務進行查詢。ServiceContainer容器可以讀取配置文件,在啟動服務的時候初始化所有的分散式服務(註意,這些服務都是無狀態的),並對這些服務進行管理。它封裝了服務的基本信息,諸如服務所在的位置,服務的部署方式等,從而避免服務的調用者直接依賴於服務的細節,既減輕了調用者的負擔,還能夠較好地實現服務的擴展與遷移。
在這個系統中,我們主要引入了Messaging模式,通過定義的IMessage介面,使得我們更好地對服務進行抽象,並以一種扁平的格式存儲數據信息,從而解除服務之間的耦合。只要各個服務就共用的消息格式達成一致,請求者就可以不依賴於接收者的具體介面。通過引入的Message對象,我們就可以建立一種在行業中通用的消息模型與分散式服務模型。事實上,基於這樣的一個框架與平臺,在對製造行業的業務進行開發時,開發人員最主要的活動是與領域專家就各種業務的消息格式進行討論,這樣一種面向領域的消息語言,很好地掃清了技術人員與業務人員的溝通障礙;同時在各個子系統之間,我們也只需要維護服務間相互傳遞的消息介面表。每個服務的實現都是完全隔離的,有效地做到了對業務知識與基礎設施的合理封裝與隔離。
對於消息的格式和內容,我們考慮引入了Message Translator模式,負責對前面定義的消息結構進行翻譯和解析。為了進一步減輕開發人員的負擔,我們還可以基於該平臺搭建一個消息-對象-關係的映射框架,引入實體引擎(Entity Engine)將消息轉換為領域實體,使得服務的開發者能夠以完全面向對象的思想開發各個服務組件,並通過調用持久層實現消息數據的持久化。同時,利用消息匯流排(此時的消息匯流排可以看做是各個服務組件的連接器)連接不同的服務,並允許非同步地傳遞消息,對消息進行編碼。這樣一個基於消息的分散式架構如圖7所示:
圖7 基於Message Bus的CIMS分散式架構
場景二:消息中間件的架構決策
在一個醫療衛生系統中,我們面臨了客戶對系統性能/可用性的非功能需求。在我們最初啟動該項目時,客戶就表達了對性能與可用性的特別關註。客戶希望最終用戶在進行複雜的替換刪除操作時,能夠具有很好的用戶體驗,簡言之,就是希望能夠快速地得到操作的響應。問題在於這樣的替換刪除操作需要處理比較複雜的業務邏輯,同時牽涉到的關聯數據量非常大,整個操作若需完成,最壞情況下可能需要幾分鐘的時間。我們可以通過引入緩存、索引、分頁等多種方式對資料庫操作進行性能調優,但整個操作的耗時始終無法達到客戶的要求。由於該系統是在一個遺留系統的基礎上開發,如果要引入Map-Reduce來處理這些操作,以滿足質量需求,則對架構的影響太大,且不能很好地重用之前系統的某些組件。顯然,付出的成本與收益並不成正比。
通過對需求進行分析,我們註意到最終客戶並不需要實時獲得結果,只要能夠保證最終結果的一致性和完整性即可。關鍵在於就用戶體驗而言,他們不希望經歷漫長的等待,然後再通知他們操作究竟是成功還是失敗。這是一個典型需要通過後臺任務進行非同步處理的場景。
在企業應用系統中,我們常常會遭遇這樣的場景。我們曾經在一個金融系統中嘗試通過自己編寫任務的方式來控制後臺線程的併發訪問,並完成對任務的調度。事實證明,這樣的設計並非行之有效。對於這種典型的非同步處理來說,基於消息傳遞的架構模式才是解決這一問題的最佳辦法。
因為消息中間件的逐步成熟,對於這一問題的架構設計,已經由原來對設計實現的關註轉為如何進行產品選型和技術決策。例如,在.NET平臺下,架構師需要重點考慮的是應該選擇哪種消息中間件來處理此等問題?這就需要我們必須結合具體的業務場景,來識別這種非同步處理方式的風險,然後再根據這些風險去比較各種技術,以求尋找到最適合的方案。
通過分析業務場景以及客戶性質,我們發現該業務場景具有如下特征:
- 在一些特定情形下,可能會集中發生批量的替換刪除操作,使得操作的併發量達到高峰;例如FDA要求召回一些違規藥品時,就需要刪除藥品庫中該藥品的信息;
- 操作結果不要求實時性,但需要保證操作的可靠性,不能因為異常失敗而導致某些操作無法進行;
- 自動操作的過程是不可逆轉的,因此需要記錄操作歷史;
- 基於性能考慮,大多數操作需要調用資料庫的存儲過程;
- 操作的數據需要具備一定的安全性,避免被非法用戶對數據造成破壞;
- 與操作相關的功能以組件形式封裝,保證組件的可重用性、可擴展性與可測試性;
- 數據量可能隨著最終用戶的增多而逐漸增大;
針對如上的業務需求,我們決定從以下幾個方面對各種技術方案進行橫向的比較與考量。
- 併發:選擇的消息隊列一定要很好地支持用戶訪問的併發性;
- 安全:消息隊列是否提供了足夠的安全機制;
- 性能伸縮:不能讓消息隊列成為整個系統的單一性能瓶頸;
- 部署:儘可能讓消息隊列的部署更為容易;
- 災備:不能因為意外的錯誤、故障或其他因素導致處理數據的丟失;
- API易用性:處理消息的API必須足夠簡單、並能夠很好地支持測試與擴展;
我們先後考察了MSMQ、Resque、ActiveMQ和RabbitMQ,通過查詢相關資料,以及編寫Spike代碼驗證相關質量,我們最終選擇了RabbitMQ。
我們選擇放棄MSMQ,是因為它嚴重依賴Windows操作系統;它雖然提供了易用的GUI方便管理人員對其進行安裝和部署,但若要編寫自動化部署腳本,卻非常困難。同時,MSMQ的隊列容量不能查過4M位元組,這也是我們無法接收的。Resque的問題是目前僅支持Ruby的客戶端調用,不能很好地與.NET平臺集成。此外,Resque對消息持久化的處理方式是寫入到Redis中,因而需要在已有RDBMS的前提下,引入新的Storage。我們比較傾心於ActiveMQ與RabbitMQ,但通過編寫測試代碼,採用迴圈發送大數據消息以驗證消息中間件的性能與穩定性時,我們發現ActiveMQ的表現並不太讓人滿意。至少,在我們的詢證調研過程中,ActiveMQ會因為頻繁發送大數據消息而偶爾出現崩潰的情況。相對而言,RabbitMQ在各個方面都比較適合我們的架構要求。
例如在災備與穩定性方面,RabbitMQ提供了可持久化的隊列,能夠在隊列服務崩潰的時候,將未處理的消息持久化到磁碟上。為了避免因為發送消息到寫入消息之間的延遲導致信息丟失,RabbitMQ引入了Publisher Confirm機制以確保消息被真正地寫入到磁碟中。它對Cluster的支持提供了Active/Passive與Active/Active兩種模式。例如,在Active/Passive模式下,一旦一個節點失敗,Passive節點就會馬上被激活,並迅速替代失敗的Active節點,承擔起消息傳遞的職責。如圖8所示:
圖8 Active/Passive Cluster(圖片來自RabbitMQ官方網站)
在併發處理方面,RabbitMQ本身是基於erlang編寫的消息中間件,作為一門面向併發處理的編程語言,erlang對併發處理的天生優勢使得我們對RabbitMQ的併發特性抱有信心。RabbitMQ可以非常容易地部署到Windows、Linux等操作系統下,同時,它也可以很好地部署到伺服器集群中。它的隊列容量是沒有限制的(取決於安裝RabbitMQ的磁碟容量),發送與接收信息的性能表現也非常好。RabbitMQ提供了Java、.NET、Erlang以及C語言的客戶端API,調用非常簡單,並且不會給整個系統引入太多第三方庫的依賴。 例如.NET客戶端只需要依賴一個程式集。
即使我們選擇了RabbitMQ,但仍有必要對系統與具體的消息中間件進行解耦,這就要求我們對消息的生產者與消費者進行抽象,例如定義如下的介面:
public interface IQueueSubscriber { void ListenTo<T>(string queueName, Action<T> action); void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully); void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages); } public interface IQueueProvider { T Pop<T>(string queueName); T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully); T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages); void Push(FunctionalArea functionalArea, string routingKey, object payload); }
在這兩個介面的實現類中,我們封裝了RabbitMQ的調用類,例如:
public class RabbitMQSubscriber : IQueueSubscriber { public void ListenTo<T>(string queueName, Action<T> action) { using (IConnection connection = _factory.OpenConnection()) using (IModel channel = connection.CreateModel()) { var consumer = new QueueingBasicConsumer(channel); string consumerTag = channel.BasicConsume(queueName, AcknowledgeImmediately, consumer); var response = (BasicDeliverEventArgs) consumer.Queue.Dequeue(); var serializer = new JavaScriptSerializer(); string json = Encoding.UTF8.GetString(response.Body); var message = serializer.Deserialize<T>(json); action(message); } } } public class RabbitMQProvider : IQueueProvider { public T Pop<T>(string queueName) { var returnVal = default(T); const bool acknowledgeImmediately = true; using (var connection = _factory.OpenConnection()) using (var channel = connection.CreateModel()) { var response = channel.BasicGet(queueName, acknowledgeImmediately); if (response != null) { var serializer = new JavaScriptSerializer(); var json = Encoding.UTF8.GetString(response.Body); returnVal = serializer.Deserialize<T>(json); } } return returnVal; } }
我們用Quartz.Net來實現Batch Job。通過定義一個實現了IStatefulJob介面的Job類,在Execute()方法中完成對隊列的偵聽。Job中RabbitMQSubscriber類的ListenTo()方法會調用Queue的Dequeue()方法,當接收的消息到達隊列時,Job會偵聽到消息達到的事件,然後以同步的方式使得消息彈出隊列,並將消息作為參數傳遞給Action委托。因此,在Batch Job的Execute()方法中,可以定義消息處理的方法,並調用RabbitMQSubscriber類的ListenTo()方法,如下所示(註意,這裡傳遞的消息事實上是Job的Id):
public void Execute(JobExecutionContext context) { string queueName = queueConfigurer.GetQueueProviders().Queue.Name; try { queueSubscriber.ListenTo<MyJob>( queueName, job => request.MakeRequest(job.Id.ToString())); } catch(Exception err) { Log.WarnFormat("Unexpected exception while processing queue '{0}', Details: {1}", queueName, err); } }
隊列的相關信息例如隊列名都存儲在配置文件中。Execute()方法調用了request對象的MakeRequest()方法,並將獲得的消息(即JobId)傳遞給該方法。它會根據JobId到資料庫中查詢該Job對應的信息,並執行真正的業務處理。
在對基於消息處理的架構進行決策時,除了前面提到的考慮因素外,還需要就許多設計細節進行多方位的判斷與權衡。例如針對Job的執行以及隊列的管理,就需要考慮如下因素:
- 對Queue中Job狀態的監控與查詢;
- 對Job優先順序的管理;
- 能否取消或終止執行時間過長的Job;
- 是否能夠設定Job的執行時間;
- 是否能夠設定Poll的間隔時間;
- 能否跨機器分散式的放入Job;
- 對失敗Job的處理;
- 能否支持多個隊列,命名隊列;
- 能否允許執行Job的工作進程對應特定的隊列;
- 對Dead Message的支持。
3、選擇的時機
究竟在什麼時候,我們應該選擇基於消息處理的分散式架構?根據我參與的多個企業應用系統的經驗,竊以為需要滿足如下幾個條件:
- 對操作的實時性要求不高,而需要執行的任務極為耗時;
- 存在企業內部的異構系統間的整合;
- 伺服器資源需要合理分配與利用;
對於第一種情況,我們常常會選擇消息隊列來處理執行時間較長的任務。此時引入的消息隊列就成了消息處理的緩衝區。消息隊列引入的非同步通信機制,使得發送方和接收方都不用等待對方返回成功消息,就可以繼續執行下麵的代碼,從而提高了數據處理的能力。尤其是當訪問量和數據流量較大的情況下,就可以結合消息隊列與後臺任務,通過避開高峰期對大數據進行處理,就可以有效降低資料庫處理數據的負荷。前面提到的醫療衛生系統正是這樣一種適用場景。
對於不同系統乃至於異構系統的整合,恰恰是消息模式善於處理的場景。只要規定了消息的格式與傳遞方式,就可以有效地實現不同系統之間的通信。在為某汽車製造商開發一個大型系統時,分銷商作為.NET客戶端,需要將數據傳遞到管理中心。這些數據將被Oracle的EBS(E-Business Suite)使用。分銷商管理系統(Dealer Management System,DMS)採用了C/S結構,資料庫為SQL Server,汽車製造商管理中心的EBS資料庫為Oracle 10g。我們需要解決兩種不同資料庫間數據的傳遞。解決方案就是利用MSMQ,將數據轉換為與資料庫無關的消息數據,併在兩端部署MSMQ伺服器,建立消息隊列以便於存儲消息數據。實現架構如圖9所示。
圖10 利用MSMQ實現的分散式處理架構
首先,分銷商的數據通過MSMQ傳遞到MSMQ Server,再將數據插入到SQL Server資料庫的同時,利用FTP將數據傳送到專門的文件伺服器上。EBS App Server會將文件伺服器中的文件,基於介面規範寫入到Oracle資料庫,從而實現.NET系統與Oracle系統之間的整合。
分散式系統通常能夠緩解單個伺服器的壓力,通過將不同的業務操作與數據處理以不同的服務形式部署並運行在不同的伺服器上,就可以有效地分配與利用伺服器資源。在這種情況下,部署在不同伺服器上的服務,既可能作為服務端,用以處理客戶端調用的請求,也可能作為客戶端,在處理完自己的業務後,將其餘業務請求委派給其他服務。在早期的CORBA系統中,通過建立統一的Naming Service,用以管理和分派服務,並通過Event Service實現事件的分發與處理。但CORBA系統採用的是RPC的方式,需要將服務設計和部署為遠程對象,並建立代理。如果通過消息通道的方式,則既可以解除這種對遠程對象的依賴,又可以很好地支持非同步調用模型。在前面提到的CIMS系統,就是通過消息匯流排提供消息傳遞的基礎設施,並建立統一的消息處理服務模型,解除服務見的依賴,使得各個服務能夠獨立地部署到不同伺服器上。
4、面臨的困難
由於消息模式自身的特殊性,我們在運用消息模式建立基於消息的分散式架構時,常常會面臨許多困難。
首先是系統集成的問題。由於系統之間的通信靠消息進行傳遞,就必須保證消息的一致性,同時,還需要維護系統之間(主要是服務之間)介面的穩定性。一旦介面發生變化,就可能影響到該介面的所有調用者。即使服務通過介面進行了抽象,由於消息持有雙方服務規定的業務數據,在一定程度上違背了封裝的要義。換言之,生產與消費消息的雙方都緊耦合於消息。消息的變化會直接影響到各個服務介面的實現類。然而,為了儘可能保證介面的抽象性,我們所要處理的消息都不是強類型的,這就使得我們在編譯期間很難發現因為消息內容發生變更產生的錯誤。在我之前提到的汽車零售商管理系統就存在這樣的問題。當時我負責的CRM模塊需要同時與多個子系統進行通信,而每個子系統又是由不同的團隊進行開發。團隊之間因為溝通原因,常常未能及時地同步介面表。雖然各個子系統的單元測試和功能測試都已通過,但直到對CRM進行集成測試,才發現存在大量消息不匹配的集成問題,這些問題的起因都是因為消息的變更。
解決的方案是引入充分的集成測試,甚至是回歸測試,並需要及時運行這些測試,以快速地獲得反饋。我們可以將集成測試作為提交代碼的驗證們,要求每次提交代碼都必須運行集成測試與指定的回歸測試 。這正是持續集成的體現。通過在本地構建與遠程構建運行集成測試與回歸測試,有效地保證本地版本與集成後的版本不會因為消息的改變使得功能遭受破壞。一旦遭受破壞,也能夠及時獲得反饋,發現問題,即刻解決這些問題,而不是等到項目後期集中進行集成測試。
另一個問題是後臺任務的非實時性帶來的測試困難。由於後臺任務是定期對消息隊列中的消息進行處理,因而觸發的時機是不可預測的 。對於這種情況,我們通常會同時運用兩種方案,雙管其下地解決問題。首先,我們會為系統引入一個同步實現功能的版本,並通過在配置文件中引入toggle的開關機制,隨時可以在同步功能與非同步功能之間進行切換。如果我們能夠保證消息隊列處理與後臺任務執行的正確性,就可以設置為同步功能,這樣就能快速而準確地對該任務所代表的功能進行測試,並及時收穫反饋。同時,我們可以在持續集成伺服器上建立一個專門的管道(pipeline),用以運行基於消息處理的非同步版本。這個管道對應的任務可以通過手動執行,也可以對管道設置定時器,在指定時間執行(例如在凌晨兩點執行一次,這樣在第二天開始工作之前可以獲得反饋)。我們需要為該管道準備特定的執行環境,並將後臺任務的偵聽與執行時間修改為可以接受的值。這樣既能夠及時瞭解功能是否正確,又能保證基於消息的系統是工作正常的。
當然,分散式系統還存在解析消息、網路傳遞的性能損耗。對於這些問題,需要架構師審慎地分析業務場景,正確地選擇架構方案與架構模式。相比較本地系統而言,分散式系統的維護難度可能成倍遞增。這既需要我們在進行架構決策與設計時,充分考慮系統架構的穩定性,同時還需要引入系統日誌處理。更好的做法是為日誌處理增加錯誤通知的功能,只要發生消息處理的錯誤信息,就通過郵件、簡訊等方式通知系統管理員,及時地處理錯誤。因為只有在發生錯誤的當時查詢錯誤日誌,才能夠更好對問題進行定位。同時,還可以為系統引入Error Message Queue以及Dead Message Queue,以便於處理錯誤和異常情況。
對於分散式系統而言,還需要考慮服務執行結果的一致性,尤其是當某個業務需要多個服務參與到一個會話中時,一旦某個服務發生故障,就可能導致應用出現狀態不一致的情況,因為只有所有參與者都成功執行了任務,才能視為完全成功。這就牽涉到分散式事務的問題,此時任務的執行就變成了事務型的:即任務必須是原子的,結果狀態必須保持一致。在任務處理過程中,狀態修改是彼此隔離的,成功的狀態修改在整個事務執行過程中是持久的。這就是事務的ACID(Atomic,Consistent,Isolated與Durable)屬性。
一種方案是引入分散式事務協調器,即DTC(Distributed Transaction Coordinator),將事務分為兩段式甚至三段式提交,要求整個事務的所有參與者以投票形式決定事務是完全成功還是失敗。另一種方案是降低對結果一致性的要求。根據eBay的最佳實踐,考慮到分散式事務的成本,獲得分散式資源即時的一致性是不必要的,也是不現實的。在Randy Shoup的文章《可伸縮性最佳實踐:來自eBay的經驗》中提到了Eric Brewer的CAP公理:分散式系統的三項重要指標——一致性(Consistency)、可用性(Availability)和 分區耐受性(Partition-tolerance)——在任意時刻,只有兩項能同時成立。我們應該根據不同的應用場景,權衡這三個要素。在不必要保證即時的一致性前提下,我們可以考慮合理地劃分服務,儘量將可能作用在同一個事務範圍的業務操作部署在同一個進程中,以避免分散式部署。如果確實需要多個分散式服務之間保持執行結果的一致,可以考慮引入數據核對,非同步恢復事件或集中決算等手段。