Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
Dapr Outbox 是1.12中的功能。
本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders
本文前提知識:熟悉Dapr狀態管理
、Dapr發佈訂閱
和Outbox 模式
。
Outbox 模式
的核心是在同一個資料庫事務中保存業務數據和待發佈的事件消息,再由某個“定時任務”讀取待發佈的事件消息併發布事件(並刪除資料庫中事件消息)
相關文章:
.NET中實現Outbox模式的框架CAP,作者Savorboard
使用 dotnetcore/CAP 的本地消息表模式,聖傑
先在內部發佈一個主題(topic)
要使用Dapr Outbox,在.NET中就是調用DaprClient
的ExecuteStateTransactionAsync(...)
方法(得先完成Outbox相關的配置!),調用此方法會完成事務操作(保存業務數據和待發佈的事件消息)併發布事件消息。
string DAPR_STORE_NAME = "statestoresql";
var client = new DaprClientBuilder().Build();
var orderId = 1;
var order = new Order(orderId);
var bytes = JsonSerializer.SerializeToUtf8Bytes(order);
var upsert = new List<StateTransactionRequest>()
{
new StateTransactionRequest(orderId.ToString(), bytes, StateOperationType.Upsert)
};
// 保存狀態,併發布事件消息
await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, upsert);
public record Order([property: JsonPropertyName("orderId")] int orderId);
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: orderpubsub # 發佈訂閱組件
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestoresql # 狀態組件
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "root:mysecret@tcp(localhost:3306)/?allowNativePasswords=true"
- name: outboxPublishPubsub
value: orderpubsub
- name: outboxPublishTopic
value: orders
調用ExecuteStateTransactionAsync(...)
方法時,此方法把請求轉發給sidecar,sidecar會發佈一個內部主題
。所謂內部,就是供Dapr使用,用戶不用操作;所謂主題(Topic)就是一個事件;此主題格式
為:namespace + appID + topic + "outbox" ,假設appID=order-processor,topic=orders,則內部主題(Topic)名就是order-processorordersoutbox
(namespace 是與k8s有關),此主題用於判斷事務是否執行成功。
註:該內部主題(topic)預設和事件消息使用同一個Dapr發佈/訂閱組件
,可以通過配置狀態組件的元數據(metadata配置)欄位outboxPubsub
單獨指定內部主題所使用的發佈/訂閱組件。相關配置請看官方文檔
主題內容是CloudEvent
格式,發佈的事件數據如下(真正的待發佈事件消息就是json中的data欄位,後面就是讀取的此值):
{
"data":"{\"orderId\":1}",
"datacontenttype":"text/plain",
"id":"outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0",
"pubsubname":"orderpubsub",
"source":"order-processor",
"specversion":"1.0",
"time":"2024-01-25T17:12:31+08:00",
"topic":"",
"traceid":"",
"traceparent":"",
"tracestate":"",
"type":"com.dapr.event.sent"
}
有了事件的發佈者,那事件的訂閱者是誰呢?appID=order-processor的Dapr sidecar實例。可以是執行保存狀態的sidecar程式,或者是appID=order-processor的其他sidecar。
在同一事務中保存狀態和事件消息
-
在內部主題(Topic)發佈
成功
後,會在同一事務中保存狀態和事件消息,也就是將方法client.ExecuteStateTransactionAsync(...)
中的數據保存到資料庫。id為outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0
的表示需待發佈事件消息,id為order-processor||1
表示狀態數據。事件消息和狀態數據保存在同一張表state
中,在mysql中其表結構和數據如下所示。 -
如果此內部主題(Topic)發佈
失敗
,調用方直接拋異常,不會執行事務操作!state
表不會有下麵兩條數據。 -
"eyJvcmRlcklkIjoxfQ=="既是狀態數據又是待發佈的事件數據;經過Base64解碼,得到該值為json格式,即:
{"orderId":1}
CREATE TABLE `state` (
`id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`value` json NOT NULL,
`isbinary` tinyint(1) NOT NULL,
`insertDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`eTag` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`expiredate` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `expiredate_idx`(`expiredate` ASC) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
id | value | isbinary | insertDate | updateDate | eTag | expiredate |
---|---|---|---|---|---|---|
outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0 | "0" | 0 | 2024-01-25 09:22:14 | 2024-01-25 09:22:14 | 07884eed-eb5d-4887-8399-051c71206ed5 | |
order-processor||1 | "eyJvcmRlcklkIjoxfQ==" | 1 | 2024-01-25 09:12:31 | 2024-01-25 09:22:14 | 3d1e368f-f6d8-4ccd-946d-c10090c7cc42 |
內部主題(Topic)的訂閱者發佈事件消息
資料庫事務執行成功後,什麼時候把事件消息發佈出去呢?
把事件消息
發佈出去是在內部主題(Topic)的訂閱者
中實現的,具體如下:
步驟X
:appID
為order-processor
的sidecar接收到內部主題(Topic)發送的事件,然後通過查詢判斷id為outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0
的數據是否存在?
- 如果存在,表示狀態數據和事件消息都已保存在mysql中,則發佈
事件消息
(事件數據就前面提到的data欄位)。事件發佈成功後,則刪除id為outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0
的記錄。 - 如果不存在就直接退出,停止後續操作;事件的訂閱者會多次收到訂閱消息,即重覆
步驟X
過程。
這裡會有一個問題:接收到內部主題(Topic)後,狀態和事件消息可能沒有持久化到mysql(前面提到過,Dapr sidecar是先發佈一個內部主題,再在同一事務中保存狀態和事件消息)。所以獲取狀態執行以下重試策略。刪除狀態時也是此重試策略。
bo := &backoff.ExponentialBackOff{
InitialInterval: time.Millisecond * 500,// 初始間隔
MaxInterval: time.Second * 3, // 最大間隔。重試時間超過此值時,以此值為準
MaxElapsedTime: time.Second * 10, // 累計重試時間
Multiplier: 3, // 遞增倍數
Clock: backoff.SystemClock,
RandomizationFactor: 0.1, // 隨機因數
}
總結
Dapr Outbox 執行流程簡單說就是:先發佈一個內部事件,再執行保存業務數據和事件消息,內部事件的訂閱者再發佈真正的事件消息。Dapr輪詢資料庫中待發佈事件消息
是通過訂閱一個內部主題(Topic)實現的。
因為狀態保存和事件發佈是在sidecar中執行,所以業務代碼和事件消息不在同一個事務中!!!Dapr Outbox是把業務的狀態數據和事件消息在同一個事務中保存,也就是代碼client.ExecuteStateTransactionAsync(...)
;並且狀態數據和事件消息是保存到同一張表state
中。
參考:
Enable the transactional outbox pattern
本文來自博客園,作者:尋己Tenleft,轉載請註明原文鏈接:https://www.cnblogs.com/tenleft/p/18150643