分散式事務之解決方案(可靠消息最終一致性)

来源:https://www.cnblogs.com/haizai/archive/2019/11/28/11954339.html

5. 分散式事務解決方案之可靠消息最終一致性 5.1. 什麼是可靠消息最終一致性事務 可靠消息最終一致性方案是指當事務發起執行完全本地事務後併發出一條消息,事務參與方(消息消費者)一定能夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。此方案是利用消息中間件完成,如下 ...


 

5. 分散式事務解決方案之可靠消息最終一致性

5.1. 什麼是可靠消息最終一致性事務

可靠消息最終一致性方案是指當事務發起執行完全本地事務後併發出一條消息,事務參與方(消息消費者)一定能夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。
此方案是利用消息中間件完成,如下圖:
事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務發起方和消息中間件之間,事務參與方(消息消費方)和消息中間件之間都是通過網路通信,由於網路通信的不確定性導致分散式事務問題。
在這裡插入圖片描述
因此可靠消息最終一致性方案要解決以下幾個問題 :
1、本地事務與消息發送的原子性問題
本地事務與消息發送的原子性問題即 :事務發起方在本地事務執行成功後消息必鬚髮出去,否則就丟棄消息。即實現本地事務和消息發送的原子性,要麼都成功,要麼都失敗。本地事務與消息發送的原子性問題是實現可靠消息最終一致性方案的關鍵問題。
先來嘗試下這種操作,先發送消息,再操作資料庫 :

begin transaction;
		// 1.發送MQ
		// 2.資料庫操作
commit transation;		

這種情況下無法保證資料庫操作與發送消息的一致性,因為可能發送消息成功,資料庫操作失敗。
你立馬想到第二種方案,先進行資料庫操作,再發送消息 :

begin transaction;
		// 1.資料庫操作
		// 2.發送MQ
commit transation;		

這種情況下貌似沒有問題,如果發送MQ消息失敗,就會拋出異常,導致資料庫事務回滾。但如果是超時異常,資料庫回滾,但MQ其實已經正常發送來,同樣會導致不一致。
2、事務參與方接收消息的可靠性
事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重覆接收消息。
3、消息重覆消費的問題
由於網路2的存在,若某一個消費節點超時但是消費成功,此時消息中間件會重覆投遞此消息,就導致來消息的重覆消費。要解決消息重覆消費的問題就要實現事務參與方的方法冪等性。

5.2. 解決方案

5.2.1. 本地消息表方案

本地消息表這個方案最初是eBay提出的,此方案的核心是通過本地事務保證數據業務操作和消息的一致性,然後通過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。
下麵以註冊送積分為例來說明 :
下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增加積分。
在這裡插入圖片描述
交互流程如下 :
1、用戶註冊
用戶服務在本地事務新增用戶和增加“積分消息日誌”。(用戶表和消息表通過本地事務保證一致)
下表是偽代碼

begin transaction;
		// 1.新增用戶
		// 2.存儲積分消息日誌
commit transation;		

這種情況下,本地資料庫操作與存儲積分消息日誌處於同一事務中,本地資料庫操作與記錄消息日誌操作具備原子性。
2、定時任務掃描日誌
如何保證將消息發送給消息隊列呢?
經過第一步消息已經寫到消息日誌表中,可以啟動獨立的線程,定時對消息日誌表中的消息進行掃描併發送至消息中間件,在消息中間件反饋發送成功後刪除該消息日誌,否則等待定時任務下一周期重試。
3、消費消息
如何保證消費者一定能消費到消息呢?
這裡可以使用MQ的ack(即消息確認)機制,消費者監聽MQ,如果消費者接收到消息並且業務處理完成後向MQ發送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將不再向消費者推送消息,否則消費者會不斷重試向消費者來發送消息。
積分服務接收到“增加積分”消息,開始增加積分,積分增加成功後消息中間件回應ack,否則消息中間件將重覆投遞此消息。
由於消息會重覆投遞,積分服務的“增加積分”功能需要實現冪等性。

5.2.2. RocketMQ事務消息方案

RocketMQ是一個來自阿裡巴巴的分散式消息中間件,於2012年開源,併在2017年正式成為Apache頂級項目。據瞭解,包括阿裡雲上的消息產品以及收購的子公司在內,阿裡集團的消息產品全線都運行在RocketMQ之上,並且最近幾年的雙十一大促中,RocketMQ都有搶眼表現。Apache RocketMQ 4.3之後的版本正式支持事務消息,為分散式事務實現提供來便利性支持。
RocketMQ事務消息設計則主要是為瞭解決Producer端的消息發送與本地事務執行的原子性問題,RocketMQ的設計中broker與producer端的雙向通信能力,使得broker天生可以作為一個事務協調者存在;而RocketMQ本身提供的存儲機製為事務消息提供了持久化能力;RocketMQ的高可用機制以及可靠消息設計則為事務消息在系統發生異常時依然能夠保證達成事務的最終一致性。
在RocketMQ 4.3後實現了完整的事務消息,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ內部,解決Producer端的消息發送與本地事務執行的原子性問題。
在這裡插入圖片描述
執行流程如下 :
為方便理解我們還以註冊送積分的例子來描述整個流程。
Producer即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。
1、Producer發送事務消息
Producer(MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預覽狀態),註意此時這條消息消費者(MQ訂閱方)是無法消費到的。
2、MQ Server回應消息發送成功
MQ Server接收到Producer發送給的消息則回應發送成功表示MQ已接收到消息。
3、Producer執行本地事務
Producer端執行業務代碼邏輯,通過本地資料庫事務控制。
本例中,Producer執行添加用戶操作。
4、消息投遞
若Producer本地事務執行成功則自動向MQ Server發送commit消息,MQ Server接收到commit消息後將“增加積分消息”狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQ Server發送rollback消息,MQ Server接收到rollback消息後將刪除“增加積分消息”。
MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重覆接收消息。這裡ack預設自動回應,即程式執行正常則自動回應ack。
5、事務回查
如果執行Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
以上主幹流程已由RocketMQ實現,對用戶則來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此只需關註本地事務的執行狀態即可。
RocketMQ提供RocketMQLocalTransactionListener介面 :

public interface RocketMQLocalTransactionListener {
	/**
	發送prepare消息成功此方法被回調,該方法用於執行本地事務
	@param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id
	@param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這裡能獲取到
	@return 返回事務狀態,COMMIT :提交 ROLLBACK :回滾 UNKNOW :回調
	*/
	RocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg);
	/**
	@param msg 通過獲取transactionId來判斷這條消息的本地事務執行狀態
	@return 返回事務狀態,COMMIT :提交 ROLLBACK :回滾 UNKNOW :回調
	*/
	RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
  • 發送事務消息 :
    以下是RocketMQ提供用於發送事務消息的API :
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 設置TransactionListener實現
producer.setTransactionListener(transactionListener);
// 發送事務消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

5.3. RocketMQ實現可靠消息最終一致性事務

5.3.1. 業務說明

本實例通過RocketMQ中間件實現可靠消息最終一致性分散式事務,模擬兩個賬戶的轉賬交易過程。
兩個賬戶在分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務。交易過程是,張三給李四轉賬指定金額。
上述交易步驟,張三扣減金額與給bank2發轉賬消息,兩個操作必須是一個整體性的事務。
在這裡插入圖片描述

5.3.2.程式組成部分

本示常式序組成部分如下: 資料庫:MySQL-5.7.25
包括bank1和bank2兩個資料庫。
JDK:64位 jdk1.8.0_201
rocketmq 服務端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服務框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE 微服務及資料庫的關係 :
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 銀行1,操作張三賬戶, 連接資料庫bank1 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 銀行2,操作李四賬戶,連接資料庫bank2
本示常式序技術架構如下 :
在這裡插入圖片描述
交互流程如下 :
1、Bank1向MQ Server發送轉賬消息;
2、Bank1執行本地事務,扣減金額;
3、Bank2接收消息,執行本地事務,添加金額。

5.3.3. 創建資料庫

創建bank1庫,並導入以下表結構和數據(包含張三賬戶)

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶餘額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);

創建bank2庫,並導入以下表結構和數據(包含李四賬戶)

CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行
卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶餘額', PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的賬戶'
更多相關文章
  • 跳過JS直接JQUERY,“不愧是你”。 對就是我。 今天開始jQuery學習第一天。 click事件方法: 滑鼠點擊 dbl事件方法: 雙擊滑鼠 mouseenter事件方法: 滑鼠進入 mouseleave事件方法: 滑鼠離開 hover: 游標懸停,多一個css可以產生和mouseenter、 ...
  • 函數的其他定義方式 函數聲明 函數表達式:把一個函數給一個變數,此時形成了函數表達式 函數調用 函數的自調用 命名函數:函數如果有名字,就是命名函數 匿名函數:函數如果沒有名字,就是匿名函數 1. 函數聲明 function f1() { console.log("助教好帥哦"); } f1(); ...
  • 若對您有用,請贊助個棒棒糖~ ...
  • 1 // 截取兩個字元串之間的子字元串,返回第一個 2 function subStringOne(text, begin, end) { 3 var regex; 4 if (end == '\\n') 5 regex = RegExp(begin + '(.+)?'); 6 else 7 reg ...
  • 基於博主也是個菜鳥,親身體驗後步驟如下: 首先,我們需要安裝node.js, https://www.runoob.com/nodejs/nodejs-install-setup.html 安裝完成後,打開命定行直接輸入node -v 就可以查看到當前安裝的node 版本了 接下來我們需要下載exp ...
  • 引入: //計算兩個數字的和 function f1(x, y) { return x + y; } //計算三個數字的和 function f2(x, y, z) { return x + y + z; } //計算四個數字的和 function f3(x, y, z, k) { return x ...
  • 前言 本篇文章預設您大概瞭解什麼是TypeScript,主要講解如何在React舊項目中安裝並使用TypeScript。 寫這個的目的主要是網上關於TypeScript這塊的講解雖然很多,但都是一些語法概念或者簡單例子,真正改造一個React舊項目使用TypeScript的文章很少。 所以在這裡記錄 ...
  • 單例模式是老生常談的一種設計模式,同時它是最簡單也是最容易被忽視的一種設計模式。單例類應該是密封類,不能被繼承,同時建議在任何情況下都要保證線程安全。 ...
一周排行
  • 前言 上一篇文章主要介紹了ObjectPool的理論知識,再來介紹一下Microsoft.Extensions.ObjectPool是如何實現的. 核心組件 ObjectPool ObjectPool 是一個泛型抽象介面,他抽象了兩個方法Get和Return Get方法用於從對象池獲取到可用對象,如 ...
  • 國內優秀的WPF開源控制項庫,Panuon.UI的優化版本。一個漂亮的、使用樣式與附加屬性的WPF UI控制項庫,值得向大家推薦使用與學習。 今天站長(Dotnet9,站長網址:https://dotnet9.com, 微信公眾號:dotnet9_com)推薦另一款開源的WPF控制項庫(PanuonUI. ...
  • WGS-84坐標系:全球定位系統使用,GPS、北斗等 GCJ-02坐標系:中國地區使用,由WGS-84偏移而來 BD-09坐標系:百度專用,由GCJ-02偏移而來 (PS:源於項目需求,本來是想讀圖片的經緯度顯示在百度離線地圖上的。後來發現定位偏差太大,仔細一想,原來是圖片和百度使用的坐標系不一樣。 ...
  • .NET Core3.1發佈 我們很高興宣佈.NET Core 3.1的發佈。實際上,這隻是對我們兩個多月前發佈的.NET Core 3.0的一小部分修複和完善。最重要的是.NET Core 3.1是長期支持(LTS)版本,並且將支持三年。和過去一樣,我們希望花一些時間來發佈下一個LTS版本。額外的 ...
  • based on https://stackoverflow.com/questions/659013/accessing-a-shared-file-unc-from-a-remote-non-trusted-domain-with-credentials ...
  • private static void PathCopyFilesWithOriginalFolder() { int sourceFilesNum = 0; try { string sourceDir = @"E:\Source"; string destDir = @"E:\Dest"; st... ...
  • 前言 上一次資料庫災備和性能優化後,資料庫專家建議,在不擴容的情況下,客戶端不能再頻繁的掃描資料庫了!一句驚醒夢中人,因為我也發現資料庫越來越卡了,自從上個項目上線後,就出現了這個情況。後來分析其原因,發現客戶端每3秒中掃描一次資料庫,一共5000+客戶端,可想而知,頻繁掃描嚴重影響到資料庫性能。所 ...
  • 2019.12.4今天開通博客,跌跌撞撞學了3年C#,感覺有了基礎但還不夠深入,有些東西學了又忘,特此開通博客做一個記錄,記錄下以後學習中的每一個知識點,再接再厲,每天進步一點點!!!!!! ...
  • 本人剛接觸.net core 由於公司項目需要部署在Linux上 近些日子學習和網上大面積搜教程 我在這給大家歸攏歸攏借鑒的教程做了套方案(我寫的可以實現 但不一定是最好的 僅供參考) 我只用過core3.0 之前的版本沒接觸過 首先需要使用Nginx反代理的項目那一定是web框架的ASP.NET ...
  • WinFrm應用程式調用WebService服務 關於WebService的創建、發佈與部署等相關操作不再贅述,傳送門如下:C# VS2019 WebService創建與發佈,並部署到Windows Server 2012R 此篇記錄一下客戶端的調用,以便後續學習使用,不足之處請指出。 建立WinF ...
x