我們旅程的最後階段的三個主要目標是使系統對故障更具彈性,提高UI的響應能力,並確保我們的設計是可伸縮的。加強系統的工作主要集中在訂單和註冊限界上下文中的RegistrationProcessManager類。性能改進工作的重點是當訂單創建時UI與領域域模型的交互方式。 ...
旅程7:增加彈性和優化性能
到達旅程的終點:最後的任務。
“你不能飛的像一隻長著鷦鷯翅膀的老鷹那樣。”亨利·哈德遜
我們旅程的最後階段的三個主要目標是使系統對故障更具彈性,提高UI的響應能力,並確保我們的設計是可伸縮的。加強系統的工作主要集中在訂單和註冊限界上下文中的RegistrationProcessManager類。性能改進工作的重點是當訂單創建時UI與領域域模型的交互方式。
本章的工作術語定義:
本章使用了一些術語,我們將在下麵進行描述。有關更多細節和可能的替代定義,請參閱參考指南中的“深入CQRS和ES”。
命令(Command):命令是要求系統執行更改系統狀態的操作。命令是必須服從(執行)的一種指令,例如:MakeSeatReservation。在這個限界上下文中,命令要麼來自用戶發起請求時的UI,要麼來自流程管理器(當流程管理器指示聚合執行某個操作時)。命令由單個接收方只處理一次,命令要麼通過命令匯流排(command bus)傳遞給接收方,要麼直接在進程中傳遞。如果是通過匯流排傳遞的,則該命令是非同步發送的,在進程中傳遞,命令將同步發送。
事件(Event):一個事件,比如OrderConfirmed,描述了系統中發生的一些事情,通常是一個命令的結果。領域模型中的聚合引發事件。事件也可以來自其他限界上下文。多個訂閱者可以處理特定的事件。聚合將事件發佈到事件匯流排。處理程式在事件匯流排上註冊特定類型的事件,然後將事件傳遞給訂閱伺服器。在訂單和註冊限界上下文中,訂閱者是流程管理器和讀取模型生成器。
快照(Snapshots):快照是一種可以應用於事件源的優化。在重新還原(rehydrated)聚合時,不需要重播與聚合相關的所有持久化事件,而是載入聚合狀態的最新副本,然後只重播保存快照後持久的事件。通過這種方式,可以減少必須從事件存儲裡加載的數據量。
冪等性(Idempotency):冪等性是一個操作的特性,這意味著該操作可以多次應用而不改變結果。例如,“將x的值設置為10”的操作是冪等的,而“將x的值加1”的操作不是冪等的。在消息傳遞環境中,如果消息可以多次傳遞而不改變結果,則消息是冪等的:這可能是因為消息本身的性質,也可能是因為系統處理消息的方式。
最終一致性(Eventual consistency):最終一致性是一個一致性模型,它不能保證立即訪問更新的值。對數據對象進行更新後,存儲系統不保證對該對象的後續訪問將返回更新後的值。然而,存儲系統確實保證,如果在足夠長的時間內沒有對對象進行新的更新,那麼最終所有訪問都可以返回最後更新的值。
架構
該應用程式旨在部署到Microsoft Azure。在旅程的這個階段,應用程式由兩個角色組成,一個包含ASP.Net MVC Web應用程式的web角色和一個包含消息處理程式和領域對象的工作角色。應用程式在寫端和讀端都使用Azure SQL DataBase實例進行數據存儲。在某些地方,應用程式還在寫端使用Azure table,在讀端使用Azure blob來存儲數據。應用程式使用Azure服務匯流排來提供其消息傳遞基礎設施。下圖展示了這個高級體繫結構。
在研究和測試解決方案時,可以在本地運行它,可以使用Azure compute emulator,也可以直接運行MVC web應用程式,並運行承載消息處理程式和領域域對象的控制台應用程式。在本地運行應用程式時,可以使用本地SQL Server Express資料庫,並使用一個在SQL Server Express資料庫實現的簡單的消息傳遞基礎設施。
有關運行應用程式的選項的更多信息,請參見附錄1“發佈說明”。
增加彈性
在旅程的這個階段,團隊研究了加強RegistrationProcessManager類的方法。該類負責管理訂單和註冊上下文中的聚合之間的交互,並確保它們彼此一致。如果要將限界上下文作為一個整體來維護其一致狀態,那麼流程管理器必須能夠適應各種各樣的故障條件。
通常,流程管理器接收傳入的事件,然後在限界上下文內部基於流程管理器的狀態發出一個或多個命令到聚合。當流程管理器發出命令時,它通常會更改自己的狀態。
訂單和註冊限界上下文包含RegistrationProcessManager類。此流程管理器在此限界上下文中和支付限界上下文中負責通過路由事件和命令協調聚合的活動。因此,流程管理器負責確保這些限界上下文中的聚合正確地彼此同步。
Gary(CQRS專家)發言:
一個聚合決定了寫模型中的一致性邊界,這個邊界和系統持久存儲數據的一致性相關。流程管理器管理不同聚合(可能在不同的限界上下文中)之間的關係,並確保聚合最終彼此一致。
註冊過程的失敗可能對系統產生不利後果:聚合可能彼此不同步,這可能導致系統中出現不可預測的行為,或者一些進程可能最終成為僵屍進程,繼續運行並使用資源,但永遠不會完成。團隊確定了以下與RegistrationProcessManager流程管理器相關的特定故障場景。流程管理器也許會:
- 崩潰或者無法在發送任何命令之前和接收事件之後持久化其狀態。這樣消息處理器可能無法將事件標記為完成,因此在超時之後,將事件放回Topic訂閱並重新處理。
- 在發送任何命令之前和持久化其狀態之後崩潰。這將使系統處於不一致的狀態,因為流程管理器保存了其新的狀態,但沒有發送預期的命令。原始事件被放回Topic訂閱並重新處理。
- 未能標記事件已被處理。流程管理器將第二次處理該事件,因此在超時之後,系統將把該事件重新放到服務匯流排Topic訂閱中。
- 在等待它所期望的特定事件時超時。流程管理器無法繼續處理並達到預期的最終狀態。
- 接收到一個流程管理器處於特定狀態時不期望接收的事件。這可能表明在其他地方存在問題,這意味著流程管理器繼續工作是不安全的。
這些設想可歸納為兩個具體的問題:
- RegistrationProcessManager成功地處理了一個事件,但是沒有將其標記為完成。在事件自動返回到Azure服務匯流排Topic訂閱之後,RegistrationProcessManager將再次處理該事件。
- RegistrationProcessManager成功地處理一個事件,並將其標記為完成,但隨後未能發送命令。
使系統能彈性的重新處理事件
如果流程管理器本身的行為是冪等的,那麼如果它第二次接收並處理一個事件,則不會導致系統中的不一致。使流程管理器的行為具有冪等性,可以避免前三種故障條件中固有的問題。崩潰之後,您可以簡單地重新啟動流程管理器,並第二次重新處理傳入的事件。
您可以讓流程管理器發送的所有命令都是冪等的,來替代讓流程管理器冪等。重新啟動流程管理器可能會導致第二次發送命令,但如果這些命令是冪等的,則不會對流程或系統產生不利影響。要使此方法生效,您仍然需要修改流程管理器,以確保它至少發送一次所有命令。如果命令是冪等的,那麼多次發送它們並不重要,但是如果根本不發送就很重要。
在V1版本中,大多數消息處理要麼已經是冪等的,要麼系統檢測到重覆的消息並將它們發送到dead-letter隊列。例外情況是OrderPlaced事件和SeatsReserved事件,因此團隊修改了系統V3版本處理這兩個事件的方式,以解決這個問題。
確保始終發送命令
需要事務行為來確保當RegistrationProcessManager類保存其狀態時,系統始終會發送命令。這要求團隊實現一個偽事務,因為將Azure服務匯流排和SQL資料庫表一起放到分散式事務中既不可取也不可行。
團隊為V3版本所採用的解決方案是確保系統持久保存RegistrationProcessManager生成的所有命令,同時持久保存RegistrationProcessManager實例的狀態。然後,系統嘗試發送命令,併在成功發送之後將它們從存儲中刪除。每當從存儲中載入RegistrationProcessManager實例時,系統還檢查未發送的消息。
性能優化
在這個階段,我們使用Visual Studio運行性能和壓力測試,以分析響應時間並確定瓶頸。團隊使用Visual Studio Load Test來模擬訪問應用程式的不同用戶數量,併在代碼中添加了額外的跟蹤,以記錄時間信息,以便進行詳細分析。團隊在Azure中創建了性能測試環境,在Azure VM角色實例中運行測試控制器和測試代理。這使我們能夠通過使用測試代理模擬不同數量的虛擬用戶來測試Contoso會議管理系統在不同負載下的執行情況。
作為這項工作的結果,團隊對系統進行了許多更改,以優化其性能。
Gary(CQRS專家)發言:
儘管在旅程中,團隊在項目結束時進行了性能測試和優化工作,但通常在你想做的時候就做這個工作是有意義的,這可以解決可伸縮性問題並儘快加固代碼。如果您正在構建自己的基礎設施,並且需要能夠處理高吞吐量,則尤其如此。
Markus(軟體開發人員)發言:
因為實現CQRS模式會導致對組成系統的許多不同部分的職責進行非常清晰的分離,所以我們發現添加優化和加強相對容易,因為許多必要的更改在系統中都非常容易定位。
優化前的UI流程
當註冊者創建一個訂單時,她將訪問UI中的以下頁面序列。
- 註冊頁面。該頁面根據最終一致的讀模型顯示會議的門票類型和當前可用的座位數量。註冊者選擇她想購買的每種座位類型的數量。
- 付款的頁面。此頁面顯示訂單摘要,其中包括一個總價和一個倒計時計時器,它告訴註冊者座位將保留多久。註冊者輸入她的詳細信息和首選的付款方式。
- 支付頁面。這裡模擬了一個第三方支付處理器。
- 註冊成功頁面。這將顯示支付是否成功。它向註冊者顯示一個訂單定位器代碼,並鏈接到另一個頁面,該頁面允許註冊者為參會者分配座位。
有關UI中的屏幕和流程的更多信息,請參閱第5章“準備發佈V1版本”中的“基於任務的UI”一節。
在V2版本中,系統必須在註冊頁面付款頁面之間處理以下命令和事件:
- RegisterToConference
- OrderPlaced
- MakeSeatReservation
- SeatsReserved
- MarkSeatsAsReserved
- OrderReservationCompleted
- OrderTotalsCalculated
此外,MVC控制器在發送初始RegisterToConference命令之前通過查詢讀模型來填充訂單,從而驗證是否有足夠的座位可用。
當團隊使用Visual Studio Load Test和不同的用戶負載模式來對應用程式做負載測試時,我們註意到高負載常常發生在UI等待領域完成其處理時和讀模型接收寫模型數據時。這樣無法顯示下一個頁面。特別是,隨著V2版本部署到中型的web和工作角色實例後,我們發現:
- 對於每秒少於5個訂單的恆定負載模式,所有訂單都在5秒的視窗內處理。
- 對於每秒8到10個訂單之間的恆定負載模式,許多訂單不能在5秒的視窗內處理。
- 對於每秒8到10個訂單之間的恆定負載模式,角色實例使用得不夠理想(例如CPU使用率很低)。
備註:從UI在服務匯流排上發送初始命令到讀模型中出現定價訂單,從而使UI能夠向用戶顯示下一個屏幕。5秒是我們希望看到的最大等待時間。
為瞭解決這個問題,團隊確定了兩個優化目標:UI和領域之間的交互,以及基礎設施。我們決定首先處理UI和領域之間的交互。當這不能充分提高性能時,我們還進行了基礎設施優化。
優化UI
團隊與領域專家討論了在UI向領域發送RegisterToConference命令之前,是否總是需要驗證座位可用性。
Gary(CQRS專家)發言:
這個場景說明瞭與最終一致性相關的一些實際問題。讀端(在本例中是定價訂單視圖模型)最終與寫端保持一致。通常,當您實現CQRS模式時,您應該能夠接受最終的一致性,而不需要在UI中等待更改傳播到讀取端。然而,在這種情況下,UI必須等待寫模型傳播到與特定順序相關的讀端信息。這可能表明原系統這一部分的分析和設計存在問題。
領域專家明確表示,系統應該在接受付款之前確認座位是否可用。Contoso不希望出售座位之後向註冊人解釋,這些座位是不可用的。因此,該團隊尋找了簡化流程的方法,直到註冊者看到付款屏幕為止。
Beth(業務經理)發言:
這種謹慎的策略並不適用於所有情況。在某些情況下,即使不能立即完成訂單,企業也可能寧願接受這筆錢。企業可能知道庫存很快就會補充,或者客戶很樂意等待。在我們的場景中,儘管Contoso可以在沒有票的情況下將錢退還給註冊者,註冊者也許仍然會購買機票,因為他以為系統已經確認過,這筆錢是沒法退還的。所以這很明顯是一個業務和領域專家要做的決策。
團隊確定了對UI流的以下兩個優化。
UI優化1
大多數情況下,會議有足夠的座位,註冊者不必相互爭奪來預訂座位。隨著大會的門票接近售罄,只有很短的一段時間內,報名者才會爭奪最後幾個座位。
如果會議有足夠的可用座位,那麼註冊者到達付款界面卻發現系統無法預訂座位的風險就很小。在這種情況下,V2版本里,在到達付款頁面之前執行的一些處理可以在付款頁面上當用戶輸入信息的時候非同步發生,這樣就減少了註冊者在看到付款頁面前經歷延遲的機會。
Jana(軟體架構師)發言:
從本質上講,我們所依賴的事實是,預訂會成功,所以避免了耗時的檢查。但我們仍然要在註冊人付款之前執行檢查,以確保座位是可用的。
但是,如果控制器在發送RegisterToConference命令之前就檢查併發現沒有足夠的座位來完成訂單,則可以重新顯示註冊屏幕,使註冊者能夠根據當前可用性更新其訂單。
Jana(軟體架構師)發言:
對這一戰略的一個可能改進是,在發送RegisterToConference命令之前,看看是否可能有足夠的座位可用。這可以減少註冊者在最後幾個座位售罄時調整訂單的次數。然而,這種場景發生的頻率很低,可能不值得實現。
UI優化2
在V2版本中,MVC控制器不顯示付款頁面,直到領域發佈OrderTotalsCalculated事件,並且系統更新了price-order視圖模型。此事件是控制器顯示屏幕之前發生的最後一個事件。
如果系統更早地計算總數並更新價格訂單視圖模型,控制器就可以更早地顯示付款頁面。團隊確定,訂單聚合可以在訂單下單時計算總數,而不是在預訂完成時計算總數。這將使UI流比V2版本更快的走到付款頁面。
優化基礎設施
“每天都有一些新的事實浮現,一些新的障礙那些威脅著我們的最嚴重的障礙。我想這就是為什麼這款游戲如此值得一玩的原因。” 羅伯特·弗爾肯·斯科特
團隊在旅程的這個階段添加的第二組優化與系統的基礎設施相關。這些更改同時處理了系統的性能和可伸縮性。下麵的部分描述了我們在這裡所做的最重要的更改。
非同步發送和接收命令和事件
作為優化過程的一部分,團隊更新了系統,以確保在服務匯流排上發送的所有消息都是非同步發送的。這種優化旨在提高應用程式的總體響應能力,並提高消息的吞吐量。作為此更改的一部分,團隊還使用了Transient Fault Handling Application Block來處理使用服務匯流排時遇到的任何瞬時錯誤。
Markus(軟體開發人員)發言:
這種優化導致了對基礎設施代碼的重大更改。將非同步調用與Transient Fault Handling Application Block相結合是複雜的,我們將受益於c# 4.5中的一些新的簡化語法!
Jana(軟體架構師)發言:
有關在使用Azure服務匯流排時幫助優化性能的其他經過驗證的實踐,請參閱本指南:Best Practices for Performance Improvements Using Service Bus Brokered Messaging
優化命令處理
V2版本對命令和事件使用相同的消息傳遞基礎設施——Azure服務匯流排。團隊評估了Contoso會議管理系統是否需要使用相同的基礎設施發送所有命令消息。
在決定是否繼續使用Azure服務匯流排傳輸所有命令消息時,我們考慮了許多因素。
- 哪些命令(如果有的話)可以在進程中處理?
- 如果處理一些進程中的命令,系統的彈性會降低嗎?
- 如果在進程中處理一些命令,會有顯著的性能提升嗎?
我們確定了一組命令,系統可以從會議web應用程式在進程中同步地發送這些命令。為了實現這種優化,我們必須向會議web應用程式添加一些基礎設施元素(事件存儲庫、事件匯流排和事件發佈者)。以前,這些基礎設施元素只在系統的工作角色中。
非同步命令是不存在的,它實際上是另一個事件。如果我必須接受一個你發給我的消息並且如果我不同意必鬚髮出一個事件。那這就不是你要我做什麼,而是你告訴我什麼已經做完了。乍一看,這似乎只有一點點不同,但它有很多含義。
Why do lots of developers use one-way command messaging (async handling) when it's not needed? Greg Young - DDD/CQRS Group
對事件源使用快照
性能測試還發現了使用可用座位(SeatsAvailability)聚合的瓶頸,我們使用快照的形式解決了這個瓶頸。
Jana(軟體架構師)發言:
一旦團隊確定了這個瓶頸,就很容易實現和測試這個解決方案。我們在實現CQRS模式時所遵循的方法的優點之一是:我們可以在系統中進行小的局部更改。更新不需要我們去跨系統的多個部分進行複雜的更改。
當系統從事件存儲中重新還原(rehydrates)聚合實例時,它必須載入並重播與該聚合實例關聯的所有事件。這裡可能的優化是存儲聚合狀態在最近某個時間點的滾動快照,以便系統只需要載入快照和後續事件,從而減少必須重新載入和重播的事件數量。在Contoso會議管理系統中,隨著時間的推移,唯一可能會累積大量事件的聚合是可用座位(SeatsAvailability)聚合。我們決定使用Memento模式作為快照解決方案的基礎,以便與可用座位(SeatsAvailability)聚合一起使用。我們實現的解決方案使用一個memento來捕獲座位可用性聚合的狀態,然後在緩存中保存一個memento的副本。然後,系統嘗試處理緩存的數據,而不是總是從事件存儲中重新載入聚合。
Gary(CQRS專家)發言:
通常,在事件源上下文中,快照是持久化的,而不是我們在項目中實現的臨時本地緩存。
並行發佈事件
就提高系統中事件消息的吞吐量而言,並行發佈事件被證明是最重要的優化之一。為了得到最好的結果,團隊進行了多次迭代:
- 迭代1:這種方法使用並行。使用Parallel.ForEach方法和自定義分區(把消息分配到分區中),並設置並行度的上限。它還使用同步的Azure服務匯流排API調用來發佈消息。
- 迭代2:這種方法使用了一些非同步API調用。它需要使用基於自定義信號量的節流來正確處理非同步回調。
- 迭代3:這種方法使用動態節流,它考慮到順時故障,這些故障表明向特定Topic發送了太多的消息。這種方法使用非同步的Azure服務匯流排API調用。
Jana(軟體架構師)發言:
當系統從服務匯流排檢索消息時,我們在SubscriptionReceiver和SessionSubscriptionReceiver類中採用了相同的動態節流方法。
在訂閱中過濾消息
另一個優化是向Azure服務匯流排Topic訂閱添加過濾器,以避免讀取那些稍後將被與訂閱關聯的處理程式忽略的消息。
Markus(軟體開發人員)發言:
這裡我們利用了Azure服務匯流排提供的特性。
為可用座位(SeatsAvailability)聚合創建專用接收器
這使可用座位(SeatsAvailability)聚合的接收者能夠使用支持會話的訂閱。這是為了確保每個聚合實例只有一個寫入者,因為可用座位(SeatsAvailability)聚合是一個高爭用的聚合。這阻止了我們在擴展時接收大量併發異常。
Jana(軟體架構師)發言:
在其他地方,我們使用帶有會話的訂閱來保證事件的順序。在本例中,我們使用會話是出於不同的原因——以確保每個聚合實例只有一個寫入者。
緩存會議信息
這個優化緩存了會議web網站到處使用的幾個讀模型。它包含邏輯來決定如何基於特定會議的可用座位的數量來保持緩存中的數據:如果有很多空位,系統可以緩存數據很長一段時間,但是如果很少有空位就不緩存數據。
劃分服務匯流排
團隊還對服務匯流排進行了劃分,以使應用程式更具可伸縮性,並避免在系統發送的消息量接近服務匯流排能夠處理的最大吞吐量時進行節流。每個服務匯流排Topic可以由Azure中的不同節點處理,因此通過使用多個Topic,我們可以增加潛在的吞吐量。我們考慮了以下分區方案:
- 為不同的消息類型使用不同的Topic。
- 使用多個相似的Topic,並以迴圈方式監聽和讀取它們,以分散負載。
有關這些劃分方案的詳細討論,請參閱Martin L. Abbott和Michael T. Fisher所寫的《可伸縮性規則:Web站點伸縮的50個原則》(Addison-Wesley, 2011)中的第11章“非同步通信和消息匯流排”。
我們決定為訂單聚合和可用聚合發佈的事件使用單獨的Topic,因為這些聚合負責了通過服務匯流排流動的大多數事件。
Gary(CQRS專家)發言:
並不是所有的信息都具有相同的重要性。您還可以使用消息匯流排來處理單獨的、按優先順序排列的不同的消息類型,甚至可以考慮不為某些消息使用消息匯流排。
Jana(軟體架構師)發言:
將服務匯流排與系統的任何其他關鍵組件一樣對待。這意味著您應該確保您的服務匯流排可以伸縮。此外,請記住,並非所有數據對您的業務都具有相同的價值。僅僅因為您有一個服務匯流排,並不意味著所有東西都必須經過它。明智的做法是消除低價值、高成本的流量。
其他的一些優化
團隊還執行了一些額外的優化,這些優化在下麵的實現細節部分中列出。團隊在這一階段的主要目標是優化系統,以確保UI呈現對用戶有足夠好的響應。我們還可以執行其他優化,這將有助於進一步提高性能,並優化系統使用資源的方式。例如,團隊考慮的進一步優化是擴展視圖模型生成器,該生成器填充系統中的各種讀取模型。每個承載視圖模型生成器實例的web角色都必須通過創建對Azure服務匯流排主題的訂閱來處理寫端發佈的事件。
提高性能的進一步更改
除了在提高應用程式性能的旅程的最後階段所做的更改之外,團隊還確定了一些其他更改,這些更改將導致進一步的改進。但是,這個旅程的可用時間有限,所以不可能在V3版本中進行這些更改。
- 我們嚮應用程式的許多地方添加了非同步行為,特別是在應用程式對Azure服務匯流排的調用中。然而,應用程式還有其他地方仍然執行阻塞調用。我們可以把那些同步調用改成非同步:例如,當系統訪問數據存儲時。此外,我們將使用新的語言特性,如async和await。
- 通過採用存儲轉發設計,可以批量處理消息,並減少往返於數據存儲的次數。例如,利用Azure服務匯流排會話將使我們能夠從服務匯流排接收一個會話,從數據存儲區讀取多個條目、處理多個消息、一次保存到數據存儲區,然後完成所有消息。
Markus(軟體開發人員)發言:
通過接受一個服務匯流排會話,只要您保持鎖,就只有一個會話的寫入者和監聽者。這減少了樂觀併發異常。這種設計特別適合可用座位聚合的讀和寫模型。對於具有非常小分區的訂單聚合關聯的讀模型,您可以從服務匯流排獲取多個小會話,併在每個會話上使用存儲轉發方法。儘管系統中的讀和寫模型都可以從這種方法中受益,但是在我們期望數據最終是一致的、而不是完全一致的讀模型中實現起來更容易。
- 該網站已經緩存了一些經常訪問的讀模型數據,但是我們可以將緩存的使用擴展到系統的其他區域。CQRS模式意味著我們可以將緩存視為最終一致的讀模型的一部分,如果需要,還可以使用不同的緩存或根本不使用緩存來訪問來自系統不同部分的讀模型數據。
- 我們可以改進可用座位(SeatsAvailability)聚合的緩存快照實現。本章稍後將詳細描述當前實現,其目的是始終檢查事件存儲,以查找在系統創建最新緩存快照之後到達的事件。當我們接收到要處理的新命令時,如果我們可以檢查是否仍然使用與系統創建最新緩存快照時相同的服務匯流排會話,那麼我們就可以知道事件存儲中是否還有其他事件。如果會話沒有更改,那麼我們就知道自己是惟一的寫入者,因此沒有必要檢查事件存儲。如果會話已經更改,那麼其他人可能已經將與聚合相關的事件寫入到存儲中,我們需要進行檢查。
應用程式當前使用相同的優先順序監聽所有服務匯流排訂閱上的所有消息。在實踐中,有些信息比其他信息更重要。因此,當應用程式處於壓力之下時,我們應該優先處理一些消息,以最小化對核心應用程式功能的影響。例如,我們可以識別某些願意接受更多延遲的讀模型。
Poe(IT運維人員)發言:
我們還可以在負載增加時使用自動縮放來擴展應用程式(例如使用Autoscaling Application Block),但是添加新實例需要時間。通過確定某些消息類型的優先順序,我們可以在自動縮放添加資源的同時,繼續在應用程式的關鍵領域提供性能。- 當前實現使用隨機生成的Guid作為存儲在SQL資料庫實例中的所有實體的鍵。當系統處於高負載下時,如果使用順序Guid,特別是與聚集索引相關的Guid,它的性能可能會更好。有關順序Guid的討論,請參見The Cost of GUIDs as Primary Keys。
- 作為系統優化的一部分,我們現在在進程中處理一些命令,而不是通過服務匯流排發送它們。我們可以將此擴展到其他命令,並可能擴展到流程管理器。
在當前實現中,流程管理器處理傳入消息,然後存儲庫嘗試同步發送傳出消息(如果服務匯流排由於節流行為引發任何異常,則使用 Transient Fault Handling Application Block重試發送命令)。我們可以替代使用一種類似於EventStoreBusPublisher類的機制以讓流程管理器保存一個消息列表,這些消息必須在一個事務里連同它的狀態一起發送,然後通知系統的另一部分,這個部分的職責是當有一些新消息準備好要發送的時候負責來發送消息。
Markus(軟體開發人員)發言:
負責發送消息的系統部分可以非同步發送消息。它還可以為發送消息實現動態節流,並動態控制要使用多少個並行發送器。我們當前的事件存儲實現是:為存儲在事件存儲里的每一個事件發佈一個單獨的,小的消息到消息匯流排上。我們可以將其中一些消息組合在一起,以減少服務匯流排上的I/O操作總數。例如,大型會議的可用座位(SeatsAvailability)聚合實例發佈大量事件,訂單(Order)聚合以突發方式發佈事件(當創建訂單(Order)聚合時,它同時發佈OrderPlaced事件和OrderTotalsCalculated事件)。這還將有助於減少系統中的延遲,因為目前,在那些順序很重要的場景中,我們必須在發送下一個事件之前等待一個事件已被髮送的確認。將事件序列分組到一條消息中意味著我們不需要在發佈單個事件之間等待確認。
增強可伸縮性的進一步更改
Contoso會議管理系統允許您部署web和工作者角色的多個實例,從而擴展應用程式以處理更大的負載。然而,該設計並不是完全可伸縮的,因為系統的其他一些元素,例如消息匯流排和數據存儲對最大可實現的吞吐量有限制。本節概述了我們可以對系統進行的一些更改,以刪除其中的一些約束,並顯著提高系統的可伸縮性。這次旅程的可用時間有限,所以沒能在V3版本中進行這些更改。
數據分區:系統在不同的分區中存儲不同類型的數據。在啟動代碼中,您可以看到不同的限界上下文如何使用不同的連接字元串連接到SQL資料庫實例。但是,每個限界上下文目前使用一個SQL資料庫實例,我們可以將其更改為使用多個不同的實例,每個實例都包含系統使用的特定數據集。例如,訂單和註冊限界上下文可以為不同的讀取模型使用不同的SQL資料庫實例。我們還可以考慮使用federations特性來使用分片擴展一些SQL資料庫實例。
“數據持久性是大多數可伸縮SaaS企業面臨的最困難的技術問題。”
-Evan Cooke, CTO, Twilio,Scaling High-Availability Infrastructure in the CloudJana(軟體架構師)發言:
在系統將數據存儲在Azure表存儲中的地方,我們選擇用鍵對數據進行分區以實現可伸縮性。作為使用SQL資料庫federations對數據進行切分的替代方法,我們可以將SQL資料庫實例中當前的一些讀模型數據移動到Azure表存儲或blob存儲中。- 進一步劃分服務匯流排:通過為不同的事件發佈者使用不同的Topic,我們已經對服務匯流排進行了劃分,以避免在系統發送的消息量接近服務匯流排能夠處理的最大吞吐量時進行節流。我們可以使用多個相似的Topic來進一步劃分主題,並通過迴圈監聽它們來分擔負載。有關此方法的詳細描述,請參見Abbott和Fisher在Scalability Rules: 50 Principles for Scaling Web Sites, (Addison-Wesley, 2011)中的第11章"Asynchronous Communication and Message Buses"
- 存儲和轉發:我們在前面關於性能改進的小節中介紹了存儲和轉發設計。通過批處理多個操作,您不僅減少了到數據存儲的往返次數,並減少了系統中的延遲,還增強了系統的可伸縮性,因為發出更少的請求可以減少對數據存儲的壓力。
監聽節流指示器並對其作出反應:目前,系統使用Transient Fault Handling Application Block來檢測瞬時錯誤條件,比如從Azure服務匯流排、SQL資料庫實例和Azure表存儲中檢測節流指示器。系統使用Block在這些場景中實現重試,通常使用指數回退策略。目前,我們在單個訂閱級別使用動態節流,但是,我們希望修改它來對特定主題的所有訂閱執行動態節流。類似地,我們希望在SQL資料庫實例級和Azure存儲帳戶級實現動態節流。
Jana(軟體架構師)發言:
在應用程式里實現動態節流的一個例子是從服務阻止節流,看EventStoreBusPublisher SubscriptionReceiver, SessionSubscriptionReceiver類是怎樣使用DynamicThrottling類來管理他們所使用的並行程度來發送或接收消息的。Poe(IT運維人員)發言:
每一個服務(Azure服務匯流排, SQL資料庫,Azure storage)都有自己獨特的方式來實現節流行為,併在負載過重時通知您。例如,請參見SQL Azure Throttling。重要的是要瞭解應用程式使用的不同服務可能會對您的應用程式造成的所有節流。Poe(IT運維人員)發言:
團隊還考慮使用Azure SQL資料庫商業版來取代Azure SQL資料庫Web版,但經過調查,我們確定目前版本之間的唯一區別是最大資料庫大小。不同版本沒有進行調優以支持不同類型的工作負載,而且兩個版本實現了相同的節流行為。
有關可伸縮性的其他信息,請參閱:
- Microsoft Azure Storage Abstractions and their Scalability Targets
- Best Practices for Performance Improvements Using Service Bus Brokered Messaging
在談到可伸縮性和高可用性時,重要的是不要抱有錯誤的樂觀態度。儘管使用許多建議的實踐,應用程式往往可以更有效地伸縮,並且對失敗更有彈性,但它們仍然容易出現高需求瓶頸。確保為性能測試和實現性能目標分配足夠的時間。
不停機遷移
“我常說,任何冒險工作的三分之二都是做準備” Amelia Earhart
團隊計劃在Azure中進行從V2到V3版本的無停機遷移。為了實現這一點,遷移過程使用一個運行在Azure工作者角色中的特殊處理器來執行一些遷移步驟。
遷移過程仍然需要您完成一個配置步驟來關閉V2處理器並打開V3處理器。回想起來,我們應該使用一種不同的機制來簡化從V2到V3處理器的轉換,該轉換基於處理程式本身的反饋,以指示它們何時完成了處理。
有關這些步驟的詳細信息,請參見附錄1“發佈說明”。
Poe(IT運維人員)發言:
在生產環境中執行遷移之前,應該始終在測試環境中演練遷移。
重建讀模型
在從V2遷移到V3期間,我們必須執行的步驟之一是通過重播事件日誌中的事件來重新構建DraftOrder和PricedOrder視圖模型,以填充新的V3讀模型表。我們可以非同步執行此操作。然而,在某個時候,我們需要開始將事件從活動的應用程式發送到這些讀模型。此外,我們需要保持這些讀模型的V2和V3版本都是最新的,直到遷移過程完成,因為V2前端web角色需要V2的讀取模型數據可用,直到切換到V3前端web角色。在切換到V3前端時,我們必須確保V3讀取的模型完全是最新的。
為了使這些讀取模型保持最新,我們創建了一個作為Azure工作者角色的臨時處理器,它在遷移過程中運行。有關更多細節,請參閱會Conference解決方案中的MigrationToV3項目。該處理器執行的步驟是:
- 創建一組新的Topic訂閱,這些訂閱將接收活動事件,這些活動事件將用於填充新的V3讀模型。這些訂閱將開始累積V3應用程式部署時將處理的事件。
- 重播事件日誌中的事件,用歷史數據填充新的V3讀取模型。
- 處理活動事件並使V2的讀模型保持最新,直到V3前端是活動的,此時我們不再需要V2的讀模型。
遷移過程首先從事件存儲中重播事件,以填充新的V3讀模型。當這一切完成時,我們停止包含事件處理程式的V2處理器,併在V3處理器中啟動新的處理程式。當它們運行並跟蹤新Topic訂閱中積累的事件時,ad-hoc處理器還使V2的讀模型保持最新,因為此時我們仍然擁有V2前端。當V3工作者角色準備好時,我們可以執行一個VIP切換來使用新的V3前端。在V3前端運行之後,我們不再需要V2讀模型。
使用這種方法要解決的問題之一是,如何確定新的V3處理器應該在什麼時候從處理事件日誌中的存檔事件切換到處理實時的事件流。在將事件寫入事件日誌的過程中存在一些延遲,因此瞬時切換可能導致一些事件的丟失。團隊決定允許V3處理器暫時可以同時處理存檔事件和實時事件,這意味著可能會有重覆的事件,相同的事件存在於事件存儲區和由新訂閱累積的事件列表中。但是,我們可以檢測這些副本並相應地處理它們。
Markus(軟體開發人員)發言:
通常,我們依賴於基礎設施來檢測重覆的消息。在這個重覆事件可能來自不同來源的特定場景中,我們不能依賴於基礎設施,必須顯式地將重覆檢測邏輯添加到代碼中。
我們考慮的另一種方法是在V3處理器中同時包含V2和V3處理。使用這種方法,在遷移期間不需要一個特別的工作人員角色來處理V2事件。但是,我們決定將特定於遷移的代碼保存在一個單獨的項目中,以避免V3發行版由於包含只在遷移期間需要的功能而膨脹。
Jana(軟體架構師)發言:
如果我們在V3處理器中同時包含V2和V3處理,遷移過程會稍微容易一些。但我們認為,這種方法的好處被不必在V3處理器中維護重覆功能的好處所抵消。
遷移的每個步驟之間的間隔需要一些時間來完成,因此遷移不會導致停機,但是用戶確實會遇到延遲。我們可以從處理切換開關的一些更快的機制中獲益,比如停止V2處理器並啟動V3處理器。
實現細節
本節描述訂單和註冊限界上下文的實現的一些重要功能。您可能會發現擁有一份代碼拷貝很有用,這樣您就可以繼續學習了。您可以從Download center下載一個副本,或者在GitHub上查看存儲庫:https://github.com/mspnp/cqrs-journey-code。您可以從GitHub上的Tags頁面下載V3版本的代碼。
備註:不要期望代碼示例與參考實現中的代碼完全匹配。本章描述了CQRS過程中的一個步驟,隨著我們瞭解更多並重構代碼,實現可能會發生變化。
增強RegistrationProcessManager類
本節描述了團隊如何通過檢查SeatsReserved和OrderPlaced消息的重覆實例來強化RegistrationProcessManager流程管理器。
檢測無序的SeatsReserved事件
通常,RegistrationProcessManager類向SeatAvailability聚合發送一個MakeSeatReservation命令,SeatAvailability聚合在進行預訂時發佈一個SeatsReserved事件,RegistrationProcessManager接收此通知。RegistrationProcessManager在創建訂單和更新訂單時都發送一條MakeSeatReservation命令。SeatsReserve事件到達的時候可能不是按順序的,但是,系統應該尊重與最後發送的命令相關的事件。本節描述的解決方案使RegistrationProcessManager能夠識別最新的SeatsReserved消息,然後忽略任何較早的消息,而不是重新處理它們。
在RegistrationProcessManager類發送MakeSeatReservation命令之前,它將該命令的Id保存在SeatReservationCommandId變數中,如下麵的代碼示例所示:
public void Handle(OrderPlaced message)
{
if (this.State == ProcessState.NotStarted)
{
this.ConferenceId = message.ConferenceId;
this.OrderId = message.SourceId;
// Use the order id as an opaque reservation id for the seat reservation.
// It could be anything else, as long as it is deterministic from the
// OrderPlaced event.
this.ReservationId = message.SourceId;
this.ReservationAutoExpiration = message.ReservationAutoExpiration;
var expirationWindow =
message.ReservationAutoExpiration.Subtract(DateTime.UtcNow);
if (expirationWindow > TimeSpan.Zero)
{
this.State = ProcessState.AwaitingReservationConfirmation;
var seatReservationCommand =
new MakeSeatReservation
{
ConferenceId = this.ConferenceId,
ReservationId = this.ReservationId,
Seats = message.Seats.ToList()
};
this.SeatReservationCommandId = seatReservationCommand.Id;
this.AddCommand(new Envelope<ICommand>(seatReservationCommand)
{
TimeToLive = expirationWindow.Add(TimeSpan.FromMinutes(1)),
});
...
}
然後,當它處理SeatsReserved事件時,它檢查該事件的CorrelationId屬性是否匹配SeatReservationCommandId變數的最新值,如下麵的代碼示例所示:
public void Handle(Envelope<SeatsReserved> envelope)
{
if (this.State == ProcessState.AwaitingReservationConfirmation)
{
if (envelope.CorrelationId != null)
{
if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) != 0)
{
// Skip this event.
Trace.TraceWarning("Seat reservation response for reservation id {0} does not match the expected correlation id.", envelope.Body.ReservationId);
return;
}
}
...
}
註意這個Handle方法如何處理Envelope實例而不是SeatsReserved實例。作為V3版本的一部分,事件被封裝在一個包含CorrelationId屬性的Envelope實例中。EventDispatcher中的DoDispatchMessage方法分配關聯Id的值。
Markus(軟體開發人員)發言:
作為添加此功能的副作用,EventProcessor類在將事件轉發給處理程式時,不能再使用dynamic關鍵字。現在在V3中,它使用了新的EventDispatcher類,該類使用反射來標識給定消息類型的正確處理程式。
在性能測試期間,團隊發現了這個特定的SeatsReserved事件的另一個問題。由於系統在載入時其他地方出現了延遲,因此第二份SeatsReserved事件被髮布了。然後,這個Handle方法拋出一個異常,導致系統在將消息發送到dead-letter隊列之前多次重試處理該消息。為瞭解決這個特定的問題,團隊修改了這個方法,添加了else if子句,如下麵的代碼示例所示:
public void Handle(Envelope<SeatsReserved> envelope)
{
if (this.State == ProcessState.AwaitingReservationConfirmation)
{
...
}
else if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) == 0)
{
Trace.TraceInformation("Seat reservation response for request {1} for reservation id {0} was already handled. Skipping event.", envelope.Body.ReservationId, envelope.CorrelationId);
}
else
{
throw new InvalidOperationException("Cannot handle seat reservation at this stage.");
}
}
Markus(軟體開發人員)發言:
此優化僅應用於此特定消息。註意,它使用了之前保存在實例中的SeatReservationCommandId屬性的值。如果希望對其他消息執行這種檢查,則需要在流程管理器中存儲更多信息。
檢測重覆的OrderPlaced事件
為了檢測重覆的OrderPlaced事件,RegistrationProcessManagerRouter類現在執行一個檢查,以查看事件是否已經被處理。V3版本的新代碼如下麵的代碼示例所示:
public void Handle(OrderPlaced @event)
{
using (var context = this.contextFactory.Invoke())
{
var pm = context.Find(x => x.OrderId == @event.SourceId);
if (pm == null)
{
pm = new RegistrationProcessManager();
}
pm.Handle(@event);
context.Save(pm);
}
}
當RegistrationProcessManager類保存狀態併發送命令時創建偽事務
Azure中不可能有包含將RegistrationProcessManager持久化到存儲里併發送命令的事務。因此,團隊決定保存流程管理器生成的所有命令,以便在流程崩潰時不會丟失這些命令,它們可以稍後發送。我們使用另一個進程來可靠地處理髮送命令。
Markus(軟體開發人員)發言:
已經遷移到V3版本的遷移實用程式更新了資料庫模式,以適應新的存儲需求。
下麵來自SqlProcessDataContext類的代碼示例顯示了系統如何持久化所有命令以及進程管理器的狀態:
public void Save(T process)
{
var entry = this.context.Entry(process);
if (entry.State == System.Data.EntityState.Detached)
this.context.Set<T>().Add(process);
var commands = process.Commands.ToList();
UndispatchedMessages undispatched = null;
if (commands.Count > 0)
{
// If there are pending commands to send, we store them as undispatched.
undispatched = new UndispatchedMessages(process.Id)
{
Commands = this.serializer.Serialize(commands)
};
this.context.Set<UndispatchedMessages>().Add(undispatched);
}
try
{
this.context.SaveChanges();
}
catch (DbUpdateConcurrencyException e)
{
throw new ConcurrencyException(e.Message, e);
}
this.DispatchMessages(undispatched, commands);
}
下麵來自SqlProcessDataContext類的代碼示例展示了系統如何發送命令消息:
private void DispatchMessages(UndispatchedMessages undispatched, List<Envelope<ICommand>> deserializedCommands = null)
{
if (undispatched != null)
{
if (deserializedCommands == null)
{
deserializedCommands = this.serializer.Deserialize<IEnumerable<Envelope<ICommand>>>(undispatched.Commands).ToList();
}
var originalCommandsCount = deserializedCommands.Count;
try
{
while (deserializedCommands.Count > 0)
{
this.commandBus.Send(deserializedCommands.First());
deserializedCommands.RemoveAt(0);
}
}
catch (Exception)
{
// We catch a generic exception as we don't know what implementation of ICommandBus we might be using.
if (originalCommandsCount != deserializedCommands.Count)
{
// If we were able to send some commands, then update the undispatched messages.
undispatched.Commands = this.serializer.Serialize(deserializedCommands);
try
{
this.context.SaveChanges();
}
catch (DbUpdateConcurrencyException)
{
// If another thread already dispatched the messages, ignore and surface original exception instead.
}
}
throw;
}
// We remove all the undispatched messages for this process manager.
this.context.Set<UndispatchedMessages>().Remove(undispatched);
this.retryPolicy.ExecuteAction(() => this.context.SaveChanges());
}
}
DispatchMessages方法還從SqlProcessDataContext類中的Find方法調用,以便當系統重新還原(rehydrates)RegistrationProcessManager實例時,它會嘗試發送任何未發送的消息。
優化UI流程
第一個優化是允許UI直接導航到註冊者頁面,前提是會議還有很多座位可用。RegistrationController類的StartRegistration方法介紹了這個變化,它現在會在創建預定併發送RegisterToConference命令之前執行一個額外的檢查,確認有足夠的剩餘座位,如下麵的代碼示例所示:
[HttpPost]
public ActionResult StartRegistration(RegisterToConference command, int orderVersion)
{
var existingOrder = orderVersion != 0 ? this.orderDao.FindDraftOrder(command.OrderId) : null;
var viewModel = existingOrder == null ? this.CreateViewModel() : this.CreateViewModel(existingOrder);
viewModel.OrderId = command.OrderId;
if (!ModelState.IsValid)
{
return View(viewModel);
}
// Checks that there are still enough available seats, and the seat type IDs submitted are valid.
ModelState.Clear();
bool needsExtraValidation = false;
foreach (var seat in command.Seats)
{
var modelItem = viewModel.Items.FirstOrDefault(x => x.SeatType.Id == seat.SeatType);
if (modelItem != null)
{
if (seat.Quantity > modelItem.MaxSelectionQuantity)
{
modelItem.PartiallyFulfilled = needsExtraValidation = true;
modelItem.OrderItem.ReservedSeats = modelItem.MaxSelectionQuantity;
}
}
else
{
// Seat type no longer exists for conference.
needsExtraValidation = true;
}
}
if (needsExtraValidation)
{
return View(viewModel);
}
command.ConferenceId = this.ConferenceAlias.Id;
this.commandBus.Send(command);
return RedirectToAction(
"SpecifyRegistrantAndPaymentDetails",
new { conferenceCode = this.ConferenceCode, orderId = command.OrderId, orderVersion = orderVersion });
}
如果沒有足夠的可用座位,控制器將重新顯示當前屏幕,顯示當前可用的座位數量,以便註冊者修改其訂單。
更改的其餘部分在RegistrationController類中的SpecifyRegistrantAndPaymentDetails方法中。下麵來自V2版本的代碼示例顯示,在優化之前,控制器在繼續跳轉到註冊頁面之前調用WaitUntilSeatsAreConfirmed方法:
[HttpGet]
[OutputCache(Duration = 0, NoStore = true)]
public ActionResult SpecifyRegistrantAndPaymentDetails(Guid orderId, int orderVersion)
{
var order = this.WaitUntilSeatsAreConfirmed(orderId, orderVersion);
if (order == null)
{
return View("ReservationUnknown");
}
if (order.State == DraftOrder.States.PartiallyReserved)
{
return this.RedirectToAction("StartRegistration", new { conferenceCode = this.ConferenceCode, orderId, orderVersion = order.OrderVersion });
}
if (order.State == DraftOrder.States.Confirmed)
{
return View("ShowCompletedOrder");
}
if (order.ReservationExpirationDate.HasValue && order.ReservationExpirationDate < DateTime.UtcNow)
{
return RedirectToAction("ShowExpiredOrder", new { conferenceCode = this.ConferenceAlias.Code, orderId = orderId });
}
var pricedOrder = this.WaitUntilOrderIsPriced(orderId, orderVersion);
if (pricedOrder == null)
{
return View("ReservationUnknown");
}
this.ViewBag.ExpirationDateUTC = order.ReservationExpirationDate;
return View(
new RegistrationViewModel
{
RegistrantDetails = new AssignRegistrantDetails { OrderId = orderId },
Order = pricedOrder
});
}
下麵的代碼示例顯示了這個方法的V3版本,它不再等待預訂被確認:
[HttpGet]
[OutputCache(Duration = 0, NoStore = true)]
public ActionResult SpecifyRegistrantAndPaymentDetails(Guid orderId, int orderVersion)
{
var pricedOrder = this.WaitUntilOrderIsPriced(orderId, orderVersion);
if (pricedOrder == null)
{
return View("PricedOrderUnknown");
}
if (!pricedOrder.ReservationExpirationDate.HasValue)
{
return View("ShowCompletedOrder");
}
if (pricedOrder.ReservationExpirationDate < DateTime.UtcNow)
{
return RedirectToAction("ShowExpiredOrder", new { conferenceCode = this.ConferenceAlias.Code, orderId = orderId });
}
return View(
new RegistrationViewModel
{
RegistrantDetails = new AssignRegistrantDetails { OrderId = orderId },
Order = pricedOrder
});
}
備註:我們將在稍後的旅程中使這個方法非同步。
UI流程的第二個優化是在流程的前面執行訂單總數的計算。在上面的代碼示例中,SpecifyRegistrantAndPaymentDetails方法仍然調用WaitUntilOrderIsPriced方法,這將暫停界面流直到系統計算出訂單的總數並使其可用於控制器(在讀端保存在priced-order視圖模型中)。
實現此功能的關鍵變更是在訂單(Order)聚合里。Order類中的構造函數現在調用CalculateTotal方法並引發OrderTotalsCalculated事件,如下麵的代碼示例所示:
public Order(Guid id, Guid conferenceId, IEnumerable<OrderItem> items, IPricingService pricingService)
: this(id)
{
var all = ConvertItems(items);
var totals = pricingService.CalculateTotal(conferenceId, all.AsReadOnly());
this.Update(new OrderPlaced
{
ConferenceId = conferenceId,
Seats = all,
ReservationAutoExpiration = DateTime.UtcNow.Add(ReservationAutoExpiration),
AccessCode = HandleGenerator.Generate(6)
});
this.Update(new OrderTotalsCalculated { Total = totals.Total, Lines = totals.Lines != null ? totals.Lines.ToArray() : null, IsFreeOfCharge = totals.Total == 0m });
}
之前,在V2版本中,訂單(Order)聚合一直等到收到MarkAsReserved命令才調用CalculateTotal方法。
非同步接收、完成和發送消息
本節概述了系統現在如何非同步地在Azure服務匯流排上執行所有I/O。
非同步接收消息
SubscriptionReceiver和SessionSubscriptionReceiver類現在非同步接收消息,而不是在ReceiveMessages方法的迴圈中同步接收消息。
有關詳細信息,請參閱SubscriptionReceiver類中的ReceiveMessages方法或SessionSubscriptionReceiver類中的ReceiveMessagesAndCloseSession方法。
Markus(軟體開發人員)發言:
此代碼示例還展示瞭如何使用Transient Fault Handling Application Block來可靠地非同步接收來自服務匯流排Topic的消息。非同步迴圈使代碼更難以讀取,但效率更高。這是推薦的最佳實踐。這段代碼將受益於c# 4中新的async關鍵字。
非同步完成消息
系統使用peek/lock機制從服務匯流排Topic訂閱中檢索消息。要瞭解系統如何非同步執行這些操作,請參閱SubscriptionReceiver和SessionSubscriptionReceiver類中的ReceiveMessages方法。這提供了一個系統如何使用非同步api的例子。
非同步發送消息
應用程式現在非同步發送服務匯流排上的所有消息。有關詳細信息,請參見TopicSender類。
在進程中同步處理命令
在V2版本中,系統使用Azure服務匯流排將所有命令傳遞給它們的接收者。這意味著系統非同步地交付命令。在V3版本中,MVC控制器現在同步地在進程中發送命令,以便通過繞過命令匯流排並將命令直接傳遞給處理程式來改進UI中的響應時間。此外,在ConferenceProcessor工作者角色中,發送到訂單(Order)聚合的命令使用相同的機制在進程中同步發送。
Markus(軟體開發人員)發言:
我們仍然非同步地向可用座位(SeatsAvailability)聚合發送命令,因為隨著RegistrationProcessManager的多個實例並行運行,將會出現爭用,因為多個線程都試圖訪問可用座位(SeatsAvailability)聚合的同一個實例。
團隊實現這種行為通過添加SynchronousCommand