前言 很多同學想對CAP的機制以及用法等想有一個詳細的瞭解,所以花了將近兩周時間寫了這份中文的CAP文檔,對 CAP 還不知道的同學可以先看一下 "這篇文章" 。 本文檔為 CAP 文獻(Wiki),本文獻同時提供中文和英文版本,英文版本目前還在翻譯中,會放到Github Wiki 中。 目錄 前言 ...
前言
很多同學想對CAP的機制以及用法等想有一個詳細的瞭解,所以花了將近兩周時間寫了這份中文的CAP文檔,對 CAP 還不知道的同學可以先看一下這篇文章。
本文檔為 CAP 文獻(Wiki),本文獻同時提供中文和英文版本,英文版本目前還在翻譯中,會放到Github Wiki 中。
目錄
1、Getting Started
1.1 介紹
CAP 是一個遵循 .NET Standard 標準庫的C#庫,用來處理分散式事務以及提供EventBus的功能,她具有輕量級,高性能,易使用等特點。
目前 CAP 使用的是 .NET Standard 1.6 的標準進行開發,目前最新預覽版本已經支持 .NET Standard 2.0.
1.2 應用場景
CAP 的應用場景主要有以下兩個:
- 分散式事務中的最終一致性(非同步確保)的方案。
分散式事務是在分散式系統中不可避免的一個硬性需求,而目前的分散式事務的解決方案也無外乎就那麼幾種,在瞭解 CAP 的分散式事務方案前,可以閱讀以下 這篇文章。
CAP 沒有採用兩階段提交(2PC)這種事務機制,而是採用的 本地消息表+MQ 這種經典的實現方式,這種方式又叫做 非同步確保。
- 具有高可用性的 EventBus。
CAP 實現了 EventBus 中的發佈/訂閱,它具有 EventBus 的所有功能。也就是說你可以像使用 EventBus 一樣來使用 CAP,另外 CAP 的 EventBus 是具有高可用性的,這是什麼意思呢?
CAP 藉助於本地消息表來對 EventBus 中的消息進行了持久化,這樣可以保證 EventBus 發出的消息是可靠的,當消息隊列出現宕機或者連接失敗的情況時,消息也不會丟失。
1.3 Quick Start
- 引用 NuGet 包
使用一下命令來引用CAP的NuGet包:
PM> Install-Package DotNetCore.CAP
根據使用的不同類型的消息隊列,來引入不同的擴展包:
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.Kafka
根據使用的不同類型的資料庫,來引入不同的擴展包:
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
- 啟動配置
在 ASP.NET Core 程式中,你可以在 Startup.cs
文件 ConfigureServices()
中配置 CAP 使用到的服務:
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
// If your SqlServer is using EF for data operations, you need to add the following configuration:
// Notice: You don't need to config x.UseSqlServer(""") again!
x.UseEntityFramework<AppDbContext>();
// If you are using Dapper,you need to add the config:
x.UseSqlServer("Your ConnectionStrings");
// If your Message Queue is using RabbitMQ you need to add the config:
x.UseRabbitMQ("localhost");
// If your Message Queue is using Kafka you need to add the config:
x.UseKafka("localhost");
});
}
在 Configure()
中配置啟動 CAP :
public void Configure(IApplicationBuilder app)
{
app.UseCap();
}
2、API介面
CAP 的 API 介面只有一個,就是 ICapPublisher
介面,你可以從 DI 容器中獲取到該介面的實例進行調用。
2.1 發佈/發送
你可以使用 ICapPublisher
介面中的 Publish<T>
或者 PublishAsync<T>
方法來發送消息:
public class PublishController : Controller
{
private readonly ICapPublisher _publisher;
//在構造函數中獲取介面實例
public PublishController(ICapPublisher publisher)
{
_publisher = publisher;
}
[Route("~/checkAccount")]
public async Task<IActionResult> PublishMessage()
{
await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });
return Ok();
}
}
下麵是PublishAsync這個介面的簽名:
PublishAsync<T>(string name,T object)
預設情況下,在調用此方法的時候 CAP 將在內部創建事務,然後將消息寫入到 Cap.Published
這個消息表。
2.1.1 事務
事務在 CAP 具有重要作用,它是保證消息可靠性的一個基石。 在發送一條消息到消息隊列的過程中,如果不使用事務,我們是沒有辦法保證我們的業務代碼在執行成功後消息已經成功的發送到了消息隊列,或者是消息成功的發送到了消息隊列,但是業務代碼確執行失敗。
這裡的失敗原因可能是多種多樣的,比如連接異常,網路故障等等。
只有業務代碼和CAP的Publish代碼必須在同一個事務中,才能夠保證業務代碼和消息代碼同時成功或者失敗。
以下是兩種使用事務進行Publish的代碼:
- EntityFramework
using (var transaction = dbContext.Database.BeginTransaction())
{
await _publisher.PublishAsync("xxx.services.account.check",
new Person { Name = "Foo", Age = 11 });
// 你的業務代碼。
transaction.Commit();
}
你的業務代碼可以位於 Publish 之前或者之後,只需要保證在同一個事務。
當CAP檢測到 Publish 是在EF事務區域內的時候,將使用當前的事務上下文進行消息的存儲。
其中,發送的內容會序列化為Json存儲到消息表中。
- Dapper
var connString = "資料庫連接字元串";
using (var connection = new MySqlConnection(connString))
{
connection.Open();
using (var transaction = connection.BeginTransaction())
{
await _publisher.PublishAsync("xxx.services.bar",
new Person { Name = "Foo", Age = 11 },
connection,
transaction);
// 你的業務代碼。
transaction.Commit();
}
}
在 Dapper 中,由於不能獲取到事務上下文,所以需要用戶手動的傳遞事務上下文到CAP中。
2.2 訂閱/消費
註意:消息端在方法實現的過程中需要實現冪等性。
使用 CapSubscribeAttribute
來訂閱 CAP 發佈出去的消息。
[CapSubscribe("xxx.services.bar")]
public void BarMessageProcessor()
{
}
這裡,你也可以使用多個 CapSubscribe[""]
來同時訂閱多個不同的消息 :
[CapSubscribe("xxx.services.bar")]
[CapSubscribe("xxx.services.foo")]
public void BarAndFooMessageProcessor()
{
}
其中,xxx.services.bar
為訂閱的消息名稱,內部實現上,這個名稱在不同的消息隊列具有不同的代表。 在 Kafka 中,這個名稱即為 Topic Name。 在RabbitMQ 中,為 RouteKey。
RabbitMQ 中的 RouteKey 支持綁定鍵表達式寫法,有兩種主要的綁定鍵:
*(星號)可以代替一個單詞.
# (井號) 可以代替0個或多個單詞.
比如在下麵這個圖中(P為發送者,X為RabbitMQ中的Exchange,C為消費者,Q為隊列)
在這個示例中,我們將發送一條關於動物描述的消息,也就是說 Name(routeKey) 欄位中的內容包含 3 個單詞。第一個單詞是描述速度的(celerity),第二個單詞是描述顏色的(colour),第三個是描述哪種動物的(species),它們組合起來類似:“
. . ”。 然後在使用
CapSubscribe
綁定的時候,Q1綁定為CapSubscribe["*.orange.*"]
, Q2 綁定為CapSubscribe["*.*.rabbit"]
和[CapSubscribe["lazy.#]
。那麼,當發送一個名為 "quick.orange.rabbit" 消息的時候,這兩個隊列將會同時收到該消息。同樣名為
lazy.orange.elephant
的消息也會被同時收到。另外,名為 "quick.orange.fox" 的消息將僅會被髮送到Q1隊列,名為 "lazy.brown.fox" 的消息僅會被髮送到Q2。"lazy.pink.rabbit" 僅會被髮送到Q2一次,即使它被綁定了2次。"quick.brown.fox" 沒有匹配到任何綁定的隊列,所以它將會被丟棄。另外一種情況,如果你違反約定,比如使用 4個單詞進行組合,例如 "quick.orange.male.rabbit",那麼它將匹配不到任何的隊列,消息將會被丟棄。
但是,假如你的消息名為 "lazy.orange.male.rabbit",那麼他們將會被髮送到Q2,因為 #(井號)可以匹配 0 或者多個單詞。
在 CAP 中,我們把每一個擁有 CapSubscribe[]
標記的方法叫做訂閱者,你可以把訂閱者進行分組。
組(Group),是訂閱者的一個集合,每一組可以有一個或者多個消費者,但是一個訂閱者只能屬於某一個組。同一個組內的訂閱者訂閱的消息只能被消費一次。
如果你在訂閱的時候沒有指定組,CAP會將訂閱者設置到一個預設的組 cap.default.group
。
以下是使用組進行訂閱的示例:
[CapSubscribe("xxx.services.foo", Group = "moduleA")]
public void FooMessageProcessor()
{
}
2.2.1 例外情況
這裡有幾種情況可能需要知道:
① 消息發佈的時候訂閱方還未啟動
Kafka:
當 Kafka 中,發佈的消息存儲於持久化的日誌文件中,所以消息不會丟失,當訂閱者所在的程式啟動的時候會消費掉這些消息。
RabbitMQ:
在 RabbitMQ 中,應用程式首次啟動會創建具有持久化的 Exchange 和 Queue,CAP 會針對每一個訂閱者Group會新建一個消費者隊列,由於首次啟動時候訂閱者未啟動的所以是沒有隊列的,消息無法進行持久化,這個時候生產者發的消息會丟失。
針對RabbitMQ的消息丟失的問題,有兩種解決方式:
i. 部署應用程式之前,在RabbitMQ中手動創建具有durable特性的Exchange和Queue,預設情況他們的名字分別是(cap.default.topic, cap.default.group)。
ii. 提前運行一遍所有實例,讓Exchange和Queue初始化。
我們建議採用第 ii 種方案,因為很容易做到。
② 消息沒有任何訂閱者
如果你發送了一條個沒有被任何訂閱者訂閱的消息,那麼此消息將會被丟棄。
3、配置
Cap 使用 Microsoft.Extensions.DependencyInjection 進行配置的註入,你也可以依賴於 DI 從json文件中讀取配置。
3.1 Cap Options
你可以使用如下方式來配置 CAP 中的一些配置項,例如
services.AddCap(capOptions => {
capOptions.FailedCallback = //...
});
CapOptions
提供了一下配置項:
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
PollingDelay | 處理消息的線程預設輪詢等待時間(秒) | int | 15 秒 |
QueueProcessorCount | 啟動隊列中消息的處理器個數 | int | 2 |
FailedMessageWaitingInterval | 輪詢失敗消息的間隔(秒) | int | 180 秒 |
FailedCallback | 執行失敗消息時的回調函數,詳情見下文 | Action | NULL |
CapOptions 提供了 FailedCallback
為處理失敗的消息時的回調函數。當消息多次發送失敗後,CAP會將消息狀態標記為Failed
,CAP有一個專門的處理者用來處理這種失敗的消息,針對失敗的消息會重新放入到隊列中發送到MQ,在這之前如果FailedCallback
具有值,那麼將首先調用此回調函數來告訴客戶端。
FailedCallback 的類型為 Action<MessageType,string,string>
,第一個參數為消息類型(發送的還是接收到),第二個參數為消息的名稱(name),第三個參數為消息的內容(content)。
3.2 RabbitMQ Options
CAP 採用的是針對 CapOptions 進行擴展來實現RabbitMQ的配置功能,所以針對 RabbitMQ 的配置用法如下:
services.AddCap(capOptions => {
capOptions.UseRabbitMQ(rabbitMQOption=>{
// rabbitmq options.
});
});
RabbitMQOptions
提供了有關RabbitMQ相關的配置:
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
HostName | 宿主地址 | string | localhost |
UserName | 用戶名 | string | guest |
Password | 密碼 | string | guest |
VirtualHost | 虛擬主機 | string | / |
Port | 埠號 | int | -1 |
TopicExchangeName | CAP預設Exchange名稱 | string | cap.default.topic |
RequestedConnectionTimeout | RabbitMQ連接超時時間 | int | 30,000 毫秒 |
SocketReadTimeout | RabbitMQ消息讀取超時時間 | int | 30,000 毫秒 |
SocketWriteTimeout | RabbitMQ消息寫入超時時間 | int | 30,000 毫秒 |
QueueMessageExpires | 隊列中消息自動刪除時間 | int | (10天) 毫秒 |
3.3 Kafka Options
CAP 採用的是針對 CapOptions 進行擴展來實現 Kafka 的配置功能,所以針對 Kafka 的配置用法如下:
services.AddCap(capOptions => {
capOptions.UseKafka(kafkaOption=>{
// kafka options.
// kafkaOptions.MainConfig.Add("", "");
});
});
KafkaOptions
提供了有關 Kafka 相關的配置,由於Kafka的配置比較多,所以此處使用的是提供的 MainConfig 字典來支持進行自定義配置,你可以查看這裡來獲取對配置項的支持信息。
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
3.4 SqlServer Options
如果你使用的是 EntityFramewrok,你用不到該配置項下的內容。
CAP 採用的是針對 CapOptions 進行擴展來實現 SqlServer 的配置功能,所以針對 SqlServer 的配置用法如下:
services.AddCap(capOptions => {
capOptions.UseSqlServer(sqlserverOptions => {
// sqlserverOptions.ConnectionString
});
});
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
Schema | Cap表架構 | string | Cap |
ConnectionString | 資料庫連接字元串 | string | null |
3.5 MySql Options
如果你使用的是 EntityFramewrok,你用不到該配置項下的內容。
CAP 採用的是針對 CapOptions 進行擴展來實現 MySql 的配置功能,所以針對 MySql 的配置用法如下:
services.AddCap(capOptions => {
capOptions.UseMySql(mysqlOptions => {
// mysqlOptions.ConnectionString
});
});
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
TableNamePrefix | Cap表名首碼 | string | cap |
ConnectionString | 資料庫連接字元串 | string | null |
4、設計原理
4.1 動機
隨著微服務架構的流行,越來越多的人在嘗試使用微服務來架構他們的系統,而在這其中我們會遇到例如分散式事務的問題,為瞭解決這些問題,我沒有發現簡單並且易於使用的解決方案,所以我決定來打造這樣一個庫來解決這個問題。
最初 CAP 是為瞭解決分散式系統中的事務問題,她採用的是 非同步確保 這種機制實現了分散式事務的最終一致性,更多這方面的信息可以查看第6節。
現在 CAP 除瞭解決分散式事務的問題外,她另外一個重要的功能就是作為 EventBus 來使用,她具有 EventBus 的所有功能,並且提供了更加簡化的方式來處理EventBus中的發佈/訂閱。
4.2 持久化
CAP 依靠本地資料庫實現消息的持久化,CAP 使用這種方式來應對一切環境或者網路異常導致消息丟失的情況,消息的可靠性是分散式事務的基石,所以在任何情況下消息都不能丟失。
對於消息的持久化分為兩種:
① 消息進入消息隊列之前的持久化
在消息進入到消息隊列之前,CAP使用本地資料庫表對消息進行持久化,這樣可以保證當消息隊列出現異常或者網路錯誤時候消息是沒有丟失的。
為了保證這種機制的可靠性,CAP使用和業務代碼相同的資料庫事務來保證業務操作和CAP的消息在持久化的過程中是強一致的。也就是說在進行消息持久化的過程中,任何一方發生異常情況資料庫都會進行回滾操作。
② 消息進入到消息隊列之後的持久化
消息進入到消息隊列之後,CAP會啟動消息隊列的持久化功能,我們需要說明一下在 RabbitMQ 和 Kafka 中CAP的消息是如何持久化的。
針對於 RabbitMQ 中的消息持久化,CAP 使用的是具有消息持久化功能的消費者隊列,但是這裡面可能有例外情況,參加 2.2.1 章節。
由於 Kafka 天生設計的就是使用文件進行的消息持久化,在所以在消息進入到Kafka之後,Kafka會保證消息能夠正確被持久化而不丟失。
4.3 通訊數據流
CAP 中消息的流轉過程大致如下:
“ P ” 代表消息發送者(生產者)。 “ C ” 代表消息消費者(訂閱者)。
4.4 一致性
CAP 採用最終一致性作為的一致性方案,此方案是遵循 CAP 理論,以下是CAP理論的描述。
C(一致性)一致性是指數據的原子性,在經典的資料庫中通過事務來保障,事務完成時,無論成功或回滾,數據都會處於一致的狀態,在分散式環境下,一致性是指多個節點數據是否一致;
A(可用性)服務一直保持可用的狀態,當用戶發出一個請求,服務能在一定的時間內返回結果;
P(分區容忍性)在分散式應用中,可能因為一些分散式的原因導致系統無法運轉,好的分區容忍性,使應用雖然是一個分散式系統,但是好像一個可以正常運轉的整體
根據 “CAP”分散式理論, 在一個分散式系統中,我們往往為了可用性和分區容錯性,忍痛放棄強一致支持,轉而追求最終一致性。大部分業務場景下,我們是可以接受短暫的不一致的。
第 6 節將對此做進一步介紹。
5、實現
CAP 封裝了在 ASP.NET Core 中的使用依賴註入來獲取 Publisher (ICapPublisher
)的介面。而啟動方式類似於 “中間件” 的形式,通過在 Startup.cs 配置 ConfigureServices
和 Configure
進行啟動。
5.1 消息表
當系統引入CAP之後並首次啟動後,CAP會在客戶端生成 3 個表,分別是 Cap.Published, Cap.Received, Cap.Queue。註意表名可能在不同的資料庫具有不同的大小寫區分,如果你在運行項目的時候沒有顯式的指定資料庫生成架構(SQL Server)或者表名首碼(MySql)的話,預設情況下就是以上3個名字。
Cap.Published:這個表主要是用來存儲 CAP 發送到MQ(Message Queue)的客戶端消息,也就是說你使用 ICapPublisher
介面 Publish 的消息內容。
Cap.Received:這個表主要是用來存儲 CAP 接收到 MQ(Message Queue) 的客戶端訂閱的消息,也就是使用 CapSubscribe[]
訂閱的那些消息。
Cap.Queue: 這個表主要是CAP內部用來處理髮送和接收消息的一個臨時表,通常情況下,如果系統不出現問題,這個表將是空的。
Published
和 Received
表具有 StatusName 欄位,這個欄位用來標識當前消息的狀態。目前共有 Scheduled,Enqueued,Processing,Successed,Failed 等幾個狀態。CAP 在處理消息的過程中會依次從 Scheduled 到 Successed 來改變這些消息狀態的值。如果是狀態值為 Successed,代表該消息已經成功的發送到了 MQ 中。如果為 Failed 則代表消息發送失敗,消息發送失敗後 CAP 會對消息進行重試,直到成功。
關於數據清理: CAP 預設情況下會每隔一個小時將消息表的數據進行清理刪除,避免數據量過多導致性能的降低。清理規則為 ExpiresAt 不為空並且小於當前時間的數據。
5.2 消息格式
CAP 採用 JSON 格式進行消息傳輸,以下是消息的對象模型:
NAME | DESCRIPTION | TYPE |
---|---|---|
Id | 消息編號 | int |
Name | 消息名稱 | string |
Content | 內容 | string |
Group | 所屬消費組 | string |
Added | 創建時間 | DateTime |
ExpiresAt | 過期時間 | DateTime |
Retries | 重試次數 | int |
StatusName | 狀態 | string |
對於 Cap.Received 中的消息,會多一個
Group
欄位來標記所屬的消費者組。
5.3 EventBus
EventBus 採用 發佈-訂閱 風格進行組件之間的通訊,它不需要顯式在組件中進行註冊。
上圖是EventBus的一個Event的流程,關於 EventBus 的更多信息就不在這裡介紹了...
在 CAP 中,為什麼說 CAP 實現了 EventBus 中的全部特性,因為 EventBus 具有的兩個大功能就是發佈和訂閱, 在 CAP 中 使用了另外一種優雅的方式來實現的,另外一個 CAP 提供的強大功能就是消息的持久化,以及在任何異常情況下消息的可靠性,這是EventBus不具有的功能。
CAP 裡面發送一個消息可以看做是一個 “Event”,一個使用了CAP的ASP.NET Core 應用程式既可以進行發送也可以進行訂閱接收。
5.4 重試
重試在實現分散式事務中具有重要作用,CAP 中會針對發送失敗或者執行失敗的消息進行重試。在整個 CAP 的設計過程中有以下幾處採用的重試策略。
① 消息發送重試
在消息發送過程中,當出現 Broker 宕機或者連接失敗的情況亦或者出現異常的情況下,這個時候 CAP 會對發送的重試,重試策略為預設 15 次失敗重試,當15次過後仍然失敗時,CAP會將此消息狀態標記為失敗。
② 消息消費重試
當 Consumer 接收到消息時,會執行消費者方法,在執行消費者方法出現異常時,會進行重試。這個重試策略和 ① 是相同的。
③ 失敗消息重試
CAP 會定期針對 ① 和 ② 中狀態為“失敗的”消息進行重試,CAP會對他們進行重新“入隊(Enqueue)”,入隊時會將消息中的重試次數標記為0,狀態置為 Enqueued。
6、分散式事務
針對於分散式事務的處理,CAP 採用的是“非同步確保”這種方案。
6.1 非同步確保
非同步確保這種方案又叫做本地消息表,這是一種經典的方案,方案最初來源於 eBay,參考資料見段末鏈接。這種方案目前也是企業中使用最多的方案之一。
相對於 TCC 或者 2PC/3PC 來說,這個方案對於分散式事務來說是最簡單的,而且它是去中心化的。在TCC 或者 2PC 的方案中,必須具有事務協調器來處理每個不同服務之間的狀態,而此種方案不需要事務協調器。
另外 2PC/TCC 這種方案如果服務依賴過多,會帶來管理複雜性增加和穩定性風險增大的問題。試想如果我們強依賴 10 個服務,9 個都執行成功了,最後一個執行失敗了,那麼是不是前面 9 個都要回滾掉?這個成本還是非常高的。
但是,並不是說 2PC 或者 TCC 這種方案不好,因為每一種方案都有其相對優勢的使用場景和優缺點,這裡就不做過多介紹了。
中文:http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html
英文:http://queue.acm.org/detail.cfm?id=1394128
7、FAQ
暫無
本文地址:http://www.cnblogs.com/savorboard/p/cap-document.html
作者博客:Savorboard
歡迎轉載,請在明顯位置給出出處及鏈接