5. 分散式事務解決方案之可靠消息最終一致性 5.1. 什麼是可靠消息最終一致性事務 可靠消息最終一致性方案是指當事務發起執行完全本地事務後併發出一條消息,事務參與方(消息消費者)一定能夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。此方案是利用消息中間件完成,如下 ...
5. 分散式事務解決方案之可靠消息最終一致性
5.1. 什麼是可靠消息最終一致性事務
可靠消息最終一致性方案是指當事務發起執行完全本地事務後併發出一條消息,事務參與方(消息消費者)一定能夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。
此方案是利用消息中間件完成,如下圖:
事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務發起方和消息中間件之間,事務參與方(消息消費方)和消息中間件之間都是通過網路通信,由於網路通信的不確定性導致分散式事務問題。
因此可靠消息最終一致性方案要解決以下幾個問題 :
1、本地事務與消息發送的原子性問題
本地事務與消息發送的原子性問題即 :事務發起方在本地事務執行成功後消息必鬚髮出去,否則就丟棄消息。即實現本地事務和消息發送的原子性,要麼都成功,要麼都失敗。本地事務與消息發送的原子性問題是實現可靠消息最終一致性方案的關鍵問題。
先來嘗試下這種操作,先發送消息,再操作資料庫 :
begin transaction;
// 1.發送MQ
// 2.資料庫操作
commit transation;
這種情況下無法保證資料庫操作與發送消息的一致性,因為可能發送消息成功,資料庫操作失敗。
你立馬想到第二種方案,先進行資料庫操作,再發送消息 :
begin transaction;
// 1.資料庫操作
// 2.發送MQ
commit transation;
這種情況下貌似沒有問題,如果發送MQ消息失敗,就會拋出異常,導致資料庫事務回滾。但如果是超時異常,資料庫回滾,但MQ其實已經正常發送來,同樣會導致不一致。
2、事務參與方接收消息的可靠性
事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重覆接收消息。
3、消息重覆消費的問題
由於網路2的存在,若某一個消費節點超時但是消費成功,此時消息中間件會重覆投遞此消息,就導致來消息的重覆消費。要解決消息重覆消費的問題就要實現事務參與方的方法冪等性。
5.2. 解決方案
5.2.1. 本地消息表方案
本地消息表這個方案最初是eBay提出的,此方案的核心是通過本地事務保證數據業務操作和消息的一致性,然後通過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。
下麵以註冊送積分為例來說明 :
下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增加積分。
交互流程如下 :
1、用戶註冊
用戶服務在本地事務新增用戶和增加“積分消息日誌”。(用戶表和消息表通過本地事務保證一致)
下表是偽代碼
begin transaction;
// 1.新增用戶
// 2.存儲積分消息日誌
commit transation;
這種情況下,本地資料庫操作與存儲積分消息日誌處於同一事務中,本地資料庫操作與記錄消息日誌操作具備原子性。
2、定時任務掃描日誌
如何保證將消息發送給消息隊列呢?
經過第一步消息已經寫到消息日誌表中,可以啟動獨立的線程,定時對消息日誌表中的消息進行掃描併發送至消息中間件,在消息中間件反饋發送成功後刪除該消息日誌,否則等待定時任務下一周期重試。
3、消費消息
如何保證消費者一定能消費到消息呢?
這裡可以使用MQ的ack(即消息確認)機制,消費者監聽MQ,如果消費者接收到消息並且業務處理完成後向MQ發送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將不再向消費者推送消息,否則消費者會不斷重試向消費者來發送消息。
積分服務接收到“增加積分”消息,開始增加積分,積分增加成功後消息中間件回應ack,否則消息中間件將重覆投遞此消息。
由於消息會重覆投遞,積分服務的“增加積分”功能需要實現冪等性。
5.2.2. RocketMQ事務消息方案
RocketMQ是一個來自阿裡巴巴的分散式消息中間件,於2012年開源,併在2017年正式成為Apache頂級項目。據瞭解,包括阿裡雲上的消息產品以及收購的子公司在內,阿裡集團的消息產品全線都運行在RocketMQ之上,並且最近幾年的雙十一大促中,RocketMQ都有搶眼表現。Apache RocketMQ 4.3之後的版本正式支持事務消息,為分散式事務實現提供來便利性支持。
RocketMQ事務消息設計則主要是為瞭解決Producer端的消息發送與本地事務執行的原子性問題,RocketMQ的設計中broker與producer端的雙向通信能力,使得broker天生可以作為一個事務協調者存在;而RocketMQ本身提供的存儲機製為事務消息提供了持久化能力;RocketMQ的高可用機制以及可靠消息設計則為事務消息在系統發生異常時依然能夠保證達成事務的最終一致性。
在RocketMQ 4.3後實現了完整的事務消息,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ內部,解決Producer端的消息發送與本地事務執行的原子性問題。
執行流程如下 :
為方便理解我們還以註冊送積分的例子來描述整個流程。
Producer即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。
1、Producer發送事務消息
Producer(MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預覽狀態),註意此時這條消息消費者(MQ訂閱方)是無法消費到的。
2、MQ Server回應消息發送成功
MQ Server接收到Producer發送給的消息則回應發送成功表示MQ已接收到消息。
3、Producer執行本地事務
Producer端執行業務代碼邏輯,通過本地資料庫事務控制。
本例中,Producer執行添加用戶操作。
4、消息投遞
若Producer本地事務執行成功則自動向MQ Server發送commit消息,MQ Server接收到commit消息後將“增加積分消息”狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQ Server發送rollback消息,MQ Server接收到rollback消息後將刪除“增加積分消息”。
MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重覆接收消息。這裡ack預設自動回應,即程式執行正常則自動回應ack。
5、事務回查
如果執行Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
以上主幹流程已由RocketMQ實現,對用戶則來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此只需關註本地事務的執行狀態即可。
RocketMQ提供RocketMQLocalTransactionListener介面 :
public interface RocketMQLocalTransactionListener {
/**
發送prepare消息成功此方法被回調,該方法用於執行本地事務
@param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id
@param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這裡能獲取到
@return 返回事務狀態,COMMIT :提交 ROLLBACK :回滾 UNKNOW :回調
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg);
/**
@param msg 通過獲取transactionId來判斷這條消息的本地事務執行狀態
@return 返回事務狀態,COMMIT :提交 ROLLBACK :回滾 UNKNOW :回調
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
- 發送事務消息 :
以下是RocketMQ提供用於發送事務消息的API :
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 設置TransactionListener實現
producer.setTransactionListener(transactionListener);
// 發送事務消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
5.3. RocketMQ實現可靠消息最終一致性事務
5.3.1. 業務說明
本實例通過RocketMQ中間件實現可靠消息最終一致性分散式事務,模擬兩個賬戶的轉賬交易過程。
兩個賬戶在分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務。交易過程是,張三給李四轉賬指定金額。
上述交易步驟,張三扣減金額與給bank2發轉賬消息,兩個操作必須是一個整體性的事務。
5.3.2.程式組成部分
本示常式序組成部分如下: 資料庫:MySQL-5.7.25
包括bank1和bank2兩個資料庫。
JDK:64位 jdk1.8.0_201
rocketmq 服務端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服務框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE 微服務及資料庫的關係 :
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 銀行1,操作張三賬戶, 連接資料庫bank1 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 銀行2,操作李四賬戶,連接資料庫bank2
本示常式序技術架構如下 :
交互流程如下 :
1、Bank1向MQ Server發送轉賬消息;
2、Bank1執行本地事務,扣減金額;
3、Bank2接收消息,執行本地事務,添加金額。
5.3.3. 創建資料庫
創建bank1庫,並導入以下表結構和數據(包含張三賬戶)
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶餘額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);
創建bank2庫,並導入以下表結構和數據(包含李四賬戶)
CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行
卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶餘額', PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的賬戶'