多數據源事務處理-涉及分散式事務

来源:https://www.cnblogs.com/wayn111/archive/2022/12/16/16987117.html
-Advertisement-
Play Games

在作者之前的 十二條後端開發經驗分享,純乾貨 文章中介紹的 優雅得Springboot + mybatis配置多數據源方式 里有很多小伙伴在評論區留言詢問多個數據源同時在一個方法中使用時,事務是否會正常有效,這裡作者 理論 + 實踐 給大家解答一波,老規矩,附作者github地址: https:// ...


在作者之前的 十二條後端開發經驗分享,純乾貨 文章中介紹的 優雅得Springboot + mybatis配置多數據源方式 里有很多小伙伴在評論區留言詢問多個數據源同時在一個方法中使用時,事務是否會正常有效,這裡作者 理論 + 實踐 給大家解答一波,老規矩,附作者github地址:

一. 數據源跨庫但是不跨 MySql 實例

這個形式就是數據源在同一個 MySQL 下,但是 jdbc-url 上的資料庫配置不同,涉及多個資料庫時,如果方法中發生異常,只有開啟事務的數據源會發生回滾,其他數據源不會回滾。看到這裡可能有點迷惑,什麼是 只有開啟事務的數據源會發生回滾,其他數據源不會回滾?

下麵給出代碼驗證:

主數據源配置

@Slf4j
@EnableTransactionManagement
@EnableAspectJAutoProxy
@Configuration
@MapperScan(basePackages = "ltd.newbee.mall.core.dao", sqlSessionFactoryRef = "masterSqlSessionFactory")
public class Db1DataSourceConfig {

    @Primary
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource masterDataSource(DruidProperties druidProperties) {
        DruidDataSource build = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(build);
    }

    /**
     * @param datasource 數據源
     * @return SqlSessionFactory
     * @Primary 預設SqlSessionFactory
     */
    @Primary
    @Bean(name = "masterSqlSessionFactory")
    public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource datasource,
                                                     Interceptor interceptor) throws Exception {
        MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        bean.setDataSource(datasource);
        // mybatis掃描xml所在位置
        bean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:mapper/*.xml"));
        bean.setTypeAliasesPackage("ltd.**.core.entity");
        bean.setPlugins(interceptor);
        GlobalConfig globalConfig = new GlobalConfig();
        GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
        dbConfig.setLogicDeleteField("isDeleted");
        dbConfig.setLogicDeleteValue("1");
        dbConfig.setLogicNotDeleteValue("0");
        globalConfig.setDbConfig(dbConfig);
        bean.setGlobalConfig(globalConfig);
        log.info("masterDataSource 配置成功");
        return bean.getObject();
    }

    @Primary
    @Bean(name = "masterTransactionManager")
    public DataSourceTransactionManager masterTransactionManager(@Qualifier("masterDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}

從數據源配置

@Slf4j
@ConditionalOnProperty(value = "transactional.mode", havingValue = "seata")
@EnableTransactionManagement
@EnableAspectJAutoProxy
@Configuration
@MapperScan(basePackages = "ltd.newbee.mall.slave.dao", sqlSessionFactoryRef = "slaveSqlSessionFactory")
public class Db2DataSourceConfig {

    @Bean
    @ConfigurationProperties("spring.datasource.druid.slave")
    public DataSource slaveDataSource(DruidProperties druidProperties) {
        DruidDataSource build = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(build);
    }


    /**
     * @param datasource 數據源
     * @return SqlSessionFactory
     * @Primary 預設SqlSessionFactory
     */
    @Bean(name = "slaveSqlSessionFactory")
    public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource datasource,
                                                    Interceptor interceptor) throws Exception {
        MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        bean.setDataSource(datasource);
        // mybatis掃描xml所在位置
        bean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:slavemapper/*.xml"));
        bean.setTypeAliasesPackage("ltd.**.slave.entity");
        bean.setPlugins(interceptor);
        GlobalConfig globalConfig = new GlobalConfig();
        GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
        dbConfig.setLogicDeleteField("isDeleted");
        dbConfig.setLogicDeleteValue("1");
        dbConfig.setLogicNotDeleteValue("0");
        globalConfig.setDbConfig(dbConfig);
        bean.setGlobalConfig(globalConfig);
        log.info("slaveDataSource 配置成功");
        return bean.getObject();
    }
    
    @Bean(name = "slaveTransactionManager")
    public DataSourceTransactionManager slaveTransactionManager(@Qualifier("slaveDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}

劃重點-上述代碼在每個數據源中都配置了 DataSourceTransactionManager(事務管理器),並且在主配置中添加 @Primary 註解,表示預設事務管理器優先使用主數據源的事務管理器。 下麵給出測試代碼:

/**
 *  Springboot測試類
 */
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class MultiDataSourceTest {
    @Autowired
    private MultiDataService multiDataService;
    @Test
    public void testRollback() {
        multiDataService.testRollback();
    }
}
/**
 *  MultiDataService實現類
 */
@Slf4j
@Service
public class MultiDataServiceImpl implements MultiDataService {
    @Autowired
    private TbTable1Service tbTable1Service;
    @Autowired
    private TbTable2Service tbTable2Service;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Override
    public void testRollback() {
        DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
        try {
            TbTable1 tbTable1 = new TbTable1();
            tbTable1.setName("test1");
            // 插入table1表
            boolean save1 = tbTable1Service.save(tbTable1);
            TbTable2 tbTable2 = new TbTable2();
            tbTable2.setName("test2");
            // 插入table2表
            boolean save2 = tbTable2Service.save(tbTable2);
            int i = 1 / 0;
            transactionManager.commit(transaction);
            Assert.isTrue(save1 && save2);
        } catch (Exception e) {
            log.info(e.getMessage(), e);
            transactionManager.rollback(transaction);
        }
    }
}

執行結果:table1表回滾成功,table2表回滾失敗。由此結果,對於 只有開啟事務的數據源會發生回滾,其他數據源不會回滾? 我們的解釋就是 Spring 中預設使用的事務管理器是使用主數據源配置還是從數據源配置由我們通過 @Primary 決定,當我們把 @Primary 切換在從數據源配置上,執行結果:table2表回滾成功,table1表回滾失敗。那怎麼解決這個問題?

當涉及到跨庫或者跨 MySQL 實例,想要保證事務操作,我們這裡先給出XA事務解決方案。附 XA 事務的說明:

XA 是由 X/Open 組織提出的分散式事務規範,XA 規範主要定義了事務協調者(Transaction Manager)和資源管理器(Resource Manager)之間的介面。

事務協調者(Transaction Manager),因為 XA 事務是基於兩階段提交協議的,所以需要有一個協調者,來保證所有的事務參與者都完成了準備工作,也就是 2PC 的第一階段。如果事務協調者收到所有參與者都準備好的消息,就會通知所有的事務都可以提交,也就是 2PC 的第二階段。

資源管理器(Resource Manager),負責控制和管理實際資源,比如資料庫。

(劃重點)XA 的 MySQL 實現使 MySQL 伺服器能夠充當資源管理器,在全局事務中處理 XA 事務。連接到 MySQL 伺服器的客戶端程式充當事務協調者

XA 事務的執行流程

XA 事務是兩階段提交的一種實現方式,根據 2PC 的規範,XA 將一次事務分割成了兩個階段,即 Prepare 和 Commit 階段。

Prepare 階段,TM 向所有 RM 發送 prepare 指令,RM 接受到指令後,執行數據修改和日誌記錄等操作,然後返回可以提交或者不提交的消息給 TM。如果事務協調者 TM 收到所有參與者都準備好的消息,會通知所有的事務提交,然後進入第二階段。

Commit 階段,TM 接受到所有 RM 的 prepare 結果,如果有 RM 返回是不可提交或者超時,那麼向所有 RM 發送 Rollback 命令;如果所有 RM 都返回可以提交,那麼向所有 RM 發送 Commit 命令,完成一次事務操作。

下麵給出兩種基於 XA 事務的解決方案:

  • Springboot 項目中可以使用 jta,完成對 XA 協議的支持,缺點就是 jta 需要改造數據源配置
  • Springboot 項目引入 seataseata 支持 XA 協議,且引入 seata-spring-boot-starter 依賴對業務無侵入,缺點需要引入 seata-server 降低了系統可用性

Springboot 項目中可以啟用 jta

  1. 引入 spring-boot-starter-jta-atomikos
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
  1. 修改主從數據源 DataSource 配置,進行包裝添加 XA 數據源支持,如下;

    @Primary
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource dataSource(DruidProperties druidProperties) {
        DruidXADataSource dataSource = druidProperties.dataSource(new DruidXADataSource());
        dataSource.setUrl("jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8");
        dataSource.setUsername("root");
        dataSource.setPassword("");
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setUniqueResourceName("master-xa");
        atomikosDataSourceBean.setXaDataSource(dataSource);
        return atomikosDataSourceBean;
    }
  1. 添加 JtaTransactionManager
@Bean
public JtaTransactionManager transactionManager() throws Exception {
    JtaTransactionManager transactionManager = new JtaTransactionManager();
    UserTransactionManager userTransactionManager = new UserTransactionManager();
    userTransactionManager.setForceShutdown(true);
    userTransactionManager.setTransactionTimeout(3000);
    transactionManager.setUserTransaction(userTransactionManager);
    transactionManager.setAllowCustomIsolationLevels(true);
    return transactionManager;
}
  1. 完成測試,代碼如下:
/**
 *  Springboot測試類
 */
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class MultiDataSourceTest {
    @Autowired
    private MultiDataService multiDataService;
    @Test
    public void jtaTestRollback() {
        multiDataService.jtaTestRollback();
    }
}
/**
 *  MultiDataService實現類
 */
@Slf4j
@Service
public class MultiDataServiceImpl implements MultiDataService {
    @Autowired
    private TbTable1Service tbTable1Service;
    @Autowired
    private TbTable2Service tbTable2Service;
    @Autowired
    private JtaTransactionManager jtaTransactionManager;
    @Override
    public void jtaTestRollback() {
        DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transaction = jtaTransactionManager.getTransaction(transactionDefinition);
        try {
            TbTable1 tbTable1 = new TbTable1();
            tbTable1.setName("test1");
            boolean save1 = tbTable1Service.save(tbTable1);
            TbTable2 tbTable2 = new TbTable2();
            tbTable2.setName("test2");
            boolean save2 = tbTable2Service.save(tbTable2);
            int i = 1 / 0;
            jtaTransactionManager.commit(transaction);
            Assert.isTrue(save1 && save2);
        } catch (Exception e) {
            log.info(e.getMessage(), e);
            jtaTransactionManager.rollback(transaction);
        }
    }
}

可以看到我們使用的是 JtaTransactionManager, 執行結果:table1表回滾成功,table2表回滾成功。驗證OK

引入 seata,添加XA協議支持

  1. 下載安裝啟動 seata-server,這裡給出官網教程:https://seata.io/zh-cn/docs/ops/deploy-guide-beginner.html
  2. 在 Springboot中引入seata最新依賴
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.5.2</version>
</dependency>
  1. 在yml文件中添加 seata 配置
seata:
  config:
    type: file
  registry:
    type: file
  application-id: newbeemall # Seata 應用編號,預設為 ${spring.application.name}
  tx-service-group: newbeemall-group # Seata 事務組編號,用於 TC 集群名
  # 服務配置項,對應 ServiceProperties 類
  service:
    # 虛擬組和分組的映射
    vgroup-mapping:
      newbeemall-group: default
    # 分組和 Seata 服務的映射
    grouplist:
      default: 127.0.0.1:8091
  data-source-proxy-mode: XA
  enabled: true
  1. 完成測試,代碼如下:
/**
 *  Springboot測試類
 */
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class MultiDataSourceTest {
    @Autowired
    private MultiDataService multiDataService;
    @Test
    public void seataTestRollback() {
        multiDataService.seataTestRollback();
    }
}
/**
 *  MultiDataService實現類
 */
@Slf4j
@Service
public class MultiDataServiceImpl implements MultiDataService {
    @Autowired
    private TbTable1Service tbTable1Service;
    @Autowired
    private TbTable2Service tbTable2Service;
    @GlobalTransactional
    @Override
    public void seataTestRollback() {
        log.info("當前 XID: {}", RootContext.getXID());
        TbTable1 tbTable1 = new TbTable1();
        tbTable1.setName("test1");
        boolean save1 = tbTable1Service.save(tbTable1);
        TbTable2 tbTable2 = new TbTable2();
        tbTable2.setName("test2");
        boolean save2 = tbTable2Service.save(tbTable2);
        int i = 1 / 0;
    }
}

如上代碼,使用 seata 時需要啟用 @GlobalTransactional 註解,並且在事務中傳遞 XIDRootContext.getXID()),執行結果:table1表回滾成功,table2表回滾成功。驗證OK

二. 數據源分佈在不同 MySql 實例

當數據源分佈在不同 MySql 實例時,這時候其實已經進入分散式事務的範疇,由上可知,XA 事務可以解決分散式環境下事務問題,也就是說上述最後兩種解決方案都可以解決分散式事務問題,但是實際使用過程中,我們建議使用 seata,理由是他不僅支持 XA 事務還支持 AT、Saga、TCC事務模型。引入 seata 官網介紹

Seata 是一款開源的分散式事務解決方案,致力於提供高性能和簡單易用的分散式事務服務。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務模式,為用戶打造一站式的分散式解決方案。

總結

關於多數據源事務的問題,不管跨不跨庫其實都屬於分散式事務的問題。推薦使用 seata 解決。

實踐代碼放在newbeemall項目:https://github.com/wayn111/newbee-mall/tree/springboot2.7 分支下
image.png

歡迎大家點贊、關註、評論,想要跟作者溝通技術問題的話可以加我微信【waynaqua】,歡迎大家前來交流。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 明天就是擁抱情人節,情侶們會在公開的場合擁抱,向世人宣告你倆的愛意,也讓這個寒冷的冬天變得格外溫馨。到了年底依然能熱情擁抱,也見證了兩人情意如昔。 今天子川就給大家帶來就是的利用Python製作表白神器,記得發給自己的心儀對象。廢話不多說直接開整~ 開發工具 Python版本: 3.6 相關模 ...
  • 本文介紹如何使用 Pandas Profiling 的比較報告功能,分析兩個數據集的分佈差異,完成數據探索分析 (EDA) 的完整流程,為後續分析做準備。 ...
  • 作者:耿宗傑 前言 關於pprof的文章在網上已是汗牛充棟,卻是千篇一律的命令介紹,鮮有真正實操的,本文將參考Go社區資料,結合自己的經驗,實戰Go程式的性能分析與優化過程。 優化思路 首先說一下性能優化的一般思路。系統性能的分析優化,一定是從大到小的步驟來進行的,即從業務架構的優化,到系統架構的優 ...
  • 前言 網易雲的Vip音樂下載下來,格式不是mp3/flac這種通用的音樂格式,而是經過加密的ncm文件。只有用網易雲的音樂App才能夠打開。於是想到可不可以把.ncm文件轉換成mp3或者flac文件,上google查了一下,發現有不少人已經做了這件事,但沒有發現C語言版本的,就想著寫一個純C語言版本 ...
  • pandas 為什麼學習pandas numpy已經可以幫助我們進行數據的處理了,那麼學習pandas的目的是什麼呢? numpy能夠幫助我們處理的是數值型的數據,當然在數據分析中除了數值型的數據還有好多其他類型的數據(字元串,時間序列),那麼pandas就可以幫我們很好的處理除了數值型的其他數據! ...
  • 大家好,咱們前面通過十篇的文章介紹了docker的基礎篇,從本篇開始,咱們的《docker學習系列》將要進入到高級篇階段(基礎篇大家可以查看之前發佈的文章)。 咱們先來介紹:docker複雜方式安裝軟體。通過按照mysql\redis兩個案例來講解 Docker複雜安裝說明,兩個案例: 1:安裝my ...
  • 1 線程安全定義 含糊的定義:如果一個對象可以安全地被多個線程同時使用,那它就是線程安全的 嚴謹的定義: 當多個線程同時訪問一個對象時,如果不用考慮這些線程在運行時環境下的調度和交替執行,也不需要進行額外的同步,或者在調用方進行任何其他的協調操作,調用這個對象的行為都可以獲得正確的結果,那就稱這個對 ...
  • 大家好,我是棧長,Nacos 2.2.0 正式發佈了! Nacos 2.2.0 還真是一個比較重要的版本,因為它涉及了太多重大更新,今天棧長給大家來解讀下。 Nacos 2.2.0 重大更新 1、刪除冗餘代碼 Nacos 2.2.0 刪除了 Nacos 1.x 版本中 Naming 和雙寫相關的舊冗 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...