Mybatis-Plus集成Sharding-JDBC與Flyway實現多租戶分庫分表

来源:https://www.cnblogs.com/xfishup/archive/2023/11/20/17845187.html
-Advertisement-
Play Games

背景 公司產品部收到了一些重要客戶的需求,他們希望能夠依賴獨立的資料庫存儲來支持他們的業務數據。與此同時,仍有許多中小客戶,可以繼續使用公共庫以滿足其需求。技術實現方面,此前持久層框架使用的Mybatis-plus,部分業務場景使用到了Sharding-JDBC用於分表,另外,我們的資料庫版本控制工 ...


背景

公司產品部收到了一些重要客戶的需求,他們希望能夠依賴獨立的資料庫存儲來支持他們的業務數據。與此同時,仍有許多中小客戶,可以繼續使用公共庫以滿足其需求。技術實現方面,此前持久層框架使用的Mybatis-plus,部分業務場景使用到了Sharding-JDBC用於分表,另外,我們的資料庫版本控制工具使用的是Flyway。

方案說明

這裡將方案進行簡要說明,配置統一通過Nacos管理(有需要的可以自行定義租戶配置頁面)。

  • 1.首先多數據源管理使用Mybatis-Plus官方推薦的dynamic-datasource-spring-boot-starter組件,需要註意的是構建動態多數據源時,我們要把Sharding-JDBC數據源也納入管理。因為我們的庫裡面畢竟只有部分表用到了Sharding-JDBC,這樣可以復用數據源。
    file
  • 2.其次,租戶與數據源之間在Nacos建立關係配置,確保根據租戶ID能夠路由到唯一的租戶數據源。我們需要自定義Sharding分片策略和多數據源切換邏輯,根據http請求傳入的租戶ID,設置正確的數據源。
    file
  • 3.動態數據源與Sharding數據源配置做為公共配置在Nacos維護,在業務服務啟動時,讀取公共配置初始化多數據源,並添加對公共多數據源配置的監聽。當配置變更時,重新構造Sharding數據源,並並更新動態多數據源。另外資料庫腳本通過自定義flyway配置執行。
    file

技術實現

前提

需要在Nacos提前維護租戶與數據源關係配置。

不使用Sharding-JDBC場景

  • 1.引入相關組件依賴。
        <dependency>
            <groupId>com.alibaba.nacos</groupId>
            <artifactId>nacos-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.flywaydb</groupId>
            <artifactId>flyway-core</artifactId>
            <version>7.15.0</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.1</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>3.4.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.6</version>
        </dependency>
  • 2.關閉Flyway自動配置和配置多數據源。
spring:
  flyway:
    #關閉flyway自動配置,自定義實現
    enabled: false
  datasource:
    dynamic:
      #預設數據源
      primary: ds0
      datasource:
        ds0:
          type: com.alibaba.druid.pool.DruidDataSource
          driverClassName: org.postgresql.Driver
          url: jdbc:postgresql://127.0.0.1:5432/ds0
          username: ds0
          password: ds0123
        ds1:
          type: com.alibaba.druid.pool.DruidDataSource
          driverClassName: org.postgresql.Driver
          url: jdbc:postgresql://127.0.0.1:5432/ds1
          username: ds1
          password: ds1123
  • 3.自定義實現Flyway配置類,對應的flyway腳本目錄結構見下圖,主庫和租戶庫SQL腳本獨立維護。
Java
@Slf4j
@Configuration
@EnableTransactionManagement
public class FlywayConfig {
    @Value("${spring.application.name}")
    private String appName;
    @Autowired
    private DataSource dataSource;

    @Bean
    public void migrate() {
        log.info("flyway開始逐數據源執行腳本");
        DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
        Map<String, DataSource> dataSources = ds.getDataSources();
        dataSources.forEach((k, v) -> {
            if (!"sharding".equals(k)) {
						    // Flyway相關參數建議通過配置管理,以下代碼僅供參考
                Flyway flyway = Flyway.configure()
                        .dataSource(v)
                        .table("t_" + k + "_" + appName + "_version")
                        .baselineOnMigrate(true)
                        .outOfOrder(true)
                        .baselineVersion("1.0.0")
                        .baselineDescription(k + "初始化")
                        .locations(CommonConstant.SQL_BASE_LOCATION + (CommonConstant.DEFAULT_DS_NAME.equals(k) ? CommonConstant.MASTER_DB : CommonConstant.TENANT_DB))
                        .load();
                flyway.migrate();
                log.info("flyway在 {} 數據源執行腳本成功", k);
            }
        });
    }
}

file

  • 4.自定義實現數據源切換Filter類。
@Slf4j
@Component
@WebFilter(filterName = "dynamicDatasourceFilter", urlPatterns = {"/*"})
public class DynamicDatasourceFilter implements Filter {
    // 構建演示用租戶與數據源關係配置
		private static Map<String, String> tenantDsMap = new HashMap<>();
    static {
        tenantDsMap.put("tenant123", "ds0");
        tenantDsMap.put("tenant456", "ds0");
				tenantDsMap.put("tenant789", "ds1");
    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        // 從請求頭獲取租戶ID
        String tenantId = httpRequest.getHeader(CommonConstant.TENANT_HEADER);
        try {
            // 設置數據源
            if (tenantDsMap.get(tenantId) == null) {
                // 如果根據租戶ID未找到租戶數據源配置,預設走主庫
                DynamicDataSourceContextHolder.push(CommonConstant.DEFAULT_DS_NAME);
            } else {
                //註意,如果是分片表,那麼需要在分片表Service類或方法上加@DS("sharding")註解,最終由sharding的庫分片策略決定SQL在哪個庫執行。而這裡的設置將會被@DS註解配置覆蓋
                DynamicDataSourceContextHolder.push(tenantDsMap.get(tenantId));
            }
            // 執行
            chain.doFilter(request, response);
        } catch (Exception e) {
            log.error("切換數據源失敗,tenantId={},請求介面uri={},異常原因:{}", tenantId, httpRequest.getRequestURI(), ExceptionUtils.getStackTrace(e));
        } finally {
            // 清空當前線程數據源
            DynamicDataSourceContextHolder.poll();
        }
    }

file

使用Sharding-JDBC

如果微服務還需要使用Sharding分片,那麼還需要引入sharding-jdbc組件依賴,並配置sharding數據源和分片規則。如果是多個服務共用資料庫,那麼建議將Sharding數據源配置做為公共配置在Nacos管理,而Sharding分片規則則做為服務個性化配置單獨維護(分片規則基本不需要動態變更),這樣當有新租戶需要申請開通獨立租戶庫的時候,直接變更Sharding數據源公共配置,服務在監聽到公共配置變更後,即可重新構建新的Sharding數據源實例和動態數據源更新,無需重啟服務。

  • 1.引入sharding-jdbc組件依賴。
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-core</artifactId>
            <version>4.1.1</version>
        </dependency>
  • 2.配置Sharding數據源和分片規則。
# sharding數據源配置
dataSources:
  ds0: !!com.alibaba.druid.pool.DruidDataSource
    driverClassName: org.postgresql.Driver
    url: jdbc:postgresql://127.0.0.1:5432/ds0
    username: ds0
    password: ds0123
  ds1: !!com.alibaba.druid.pool.DruidDataSource
    driverClassName: org.postgresql.Driver
    url: jdbc:postgresql://127.0.0.1:5432/ds1
    username: ds1
    password: ds1123
  ds2: !!com.alibaba.druid.pool.DruidDataSource
    driverClassName: org.postgresql.Driver
    url: jdbc:postgresql://127.0.0.1:5432/ds2
    username: ds2
    password: ds2123
# sharding分片規則配置
shardingRule:
  tables:
    t_order:
      actualDataNodes: ds$->{0..2}.t_order$->{0..1}
      tableStrategy:
        inline:
          shardingColumn: order_no
          algorithmExpression: t_order$->{order_no.toBigInteger() % 2}
  defaultDataSourceName: ds0
  # 預設庫分片策略
  defaultDatabaseStrategy:
    standard:
      shardingColumn: tenant_id
			# 自定義精確分片策略
      preciseAlgorithmClassName: cn.xtstu.demo.config.CustomDataSourcePreciseShardingAlgorithm
    #hint:
		    # 
    #  algorithmClassName: cn.xtstu.demo.config.CustomHintShardingAlgorithm
  defaultTableStrategy:
    none:
props:
  sql.show: true
  • 3.自定義精確分片策略。
public class CustomDataSourcePreciseShardingAlgorithm implements PreciseShardingAlgorithm<String> {

    // 構建演示用租戶與數據源關係配置
		private static Map<String, String> tenantDsMap = new HashMap<>();
    static {
        tenantDsMap.put("tenant123", "ds0");
        tenantDsMap.put("tenant456", "ds0");
				tenantDsMap.put("tenant789", "ds1");
    }
		
    @Override
    public String doSharding(Collection<String> dataSourceNames, PreciseShardingValue<String> shardingValue) {
		    // 庫分片策略配置的分片鍵是欄位tenant_id,根據分片鍵查詢配置的數據源
		    String dsName = tenantDsMap.get(shardingValue.getValue());
        // 如果如前文所屬,Sharding子數據源key與dynamic數據源key保持一致的話,這裡直接返回就行了
				return dsName;
        // TODO 需要處理未匹配到數據源的情況
    }
}
  • 4.自定義Hint分片策略(可選),適用於分片鍵與SQL無關的場景。
public class CustomHintShardingAlgorithm implements HintShardingAlgorithm<Integer> {

    // 構建演示用租戶與數據源關係配置
		private static Map<String, String> tenantDsMap = new HashMap<>();
    static {
        tenantDsMap.put("tenant123", "ds0");
        tenantDsMap.put("tenant456", "ds0");
				tenantDsMap.put("tenant789", "ds1");
    }
		
    @Override
    public Collection<String> doSharding(Collection<String> collection, HintShardingValue<Integer> hintShardingValue) {
        Collection<String> result = new ArrayList<>();
        // 從請求頭取到當前租戶ID
				HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        result.add(tenantDsMap.get(request.getHeader("tenantId")));
				// TODO  需要處理未匹配到數據源的情況
        return result;
    }
}
  • 5.自定義動態數據源配置(核心就是將sharding數據源及其子數據源添加到動態數據源一起管理)。
@Slf4j
@Configuration
public class CustomDynamicDataSourceConfig {
    @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}")
    private String dataId;
    @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}")
    private String group;
    @Resource
    private DynamicDataSourceProperties properties;
    @Resource
    private NacosHelper nacosHelper;

    /**
     * 啟動時通過查詢Nacos上sharding數據源及分片規則yaml配置初始化sharding-jdbc數據源
     *
     * @return
     */
    @Bean
    public ShardingDataSource shardingDataSource() {
        ConfigService configService = nacosHelper.getConfigService();
        if (configService == null) {
            log.error("連接nacos失敗");
        }
        String configInfo = null;
        try {
            configInfo = configService.getConfig(dataId, group, 5000);
        } catch (NacosException e) {
            log.error("獲取{}配置失敗,異常原因:{}", dataId, ExceptionUtils.getStackTrace(e));
        }
        if (StringUtils.isBlank(configInfo)) {
            log.error("{}配置為空,啟動失敗", dataId);
            throw new NullPointerException(dataId + "配置為空");
        }
        try {
            // 通過工廠類和yaml配置創建Sharding數據源
            return (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            log.error("創建sharding-jdbc數據源異常:{}", ExceptionUtils.getStackTrace(e));
            throw new NullPointerException("sharding-jdbc數據源為空");
        }
    }

    /**
     * 將動態數據源設置為首選的
     * 當spring存在多個數據源時, 自動註入的是首選的對象
     * 設置為主要的數據源之後,就可以支持shardingJdbc原生的配置方式了
     */
    @Primary
    @Bean
    public DataSource dataSource() {
        DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
        dataSource.setPrimary(properties.getPrimary());
        dataSource.setStrict(properties.getStrict());
        dataSource.setStrategy(properties.getStrategy());
        dataSource.setP6spy(properties.getP6spy());
        dataSource.setSeata(properties.getSeata());
        return dataSource;
    }

    /**
     * 初始化動態數據源
     *
     * @return
     */
    @Bean
    public DynamicDataSourceProvider dynamicDataSourceProvider(ShardingDataSource shardingDataSource) {
        return new AbstractDataSourceProvider() {
            @Override
            public Map<String, DataSource> loadDataSources() {
                Map<String, DataSource> dataSourceMap = new HashMap<>();
                // 將sharding數據源整體添加到動態數據源里
                dataSourceMap.put(CommonConstant.SHARDING_DS_NAME, shardingDataSource);
                // 同時把sharding內部管理的子數據源也添加到動態數據源里
                Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap();
                dataSourceMap.putAll(shardingInnerDataSources);
                return dataSourceMap;
            }
        };
    }
}
  • 6.最後給出一份通過監聽Nacos配置變更動態更新數據源的示例代碼。註意:這份示例代碼中只給出了Sharding配置變更時的處理邏輯,如果是dynamic數據源配置的話,有需要的可以參考著自行實現。
@Slf4j
@Configuration
public class NacosShardingConfigListener {
    @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}")
    private String dataId;
    @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}")
    private String group;
    @Value("${spring.application.name}")
    private String appName;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private NacosHelper nacosHelper;

    @PostConstruct
    public void shardingConfigListener() throws Exception {
        ConfigService configService = nacosHelper.getConfigService();
        if (configService == null) {
            return;
        }
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }

            @Override
            public void receiveConfigInfo(String configInfo) {
                log.info("configInfo:\n{}", configInfo);
                if (StringUtils.isBlank(configInfo)) {
                    log.warn("sharding-jdbc配置為空,不會刷新數據源");
                    return;
                }
                try {
                    if (StringUtils.isNotBlank(configInfo)) {
                        // 通過yaml配置創建sharding數據源(註意:如果分片規則是獨立配置文件,那麼需要提前合併數據源和分片規則配置)
                        ShardingDataSource shardingDataSource = (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8));
                        Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap();
                        DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
                        // 遍歷sharding子數據源
                        for (String poolName : shardingInnerDataSources.keySet()) {
                            // TODO 這裡還有個細節,如果yaml配置刪減了數據源,對應數據源應該要從ds中remove掉,且主數據源不能被remove。另外其實只有新增的數據源才需要執行flyway腳本
                            // 將sharding子數據源逐個添加到動態數據源
                            ds.addDataSource(poolName, shardingInnerDataSources.get(poolName));
                            // 通過代碼完成數據源Flyway配置,並執行遷移操作
														Flyway flyway = Flyway.configure()
                                    .dataSource(dataSource)
                                    .table("t_" + poolName + "_" + appName + "_version")
                                    .baselineOnMigrate(true)
                                    .outOfOrder(true)
                                    .baselineVersion("1.0.0")
                                    .baselineDescription(poolName + "初始化")
                                    .locations(CommonConstant.SQL_BASE_LOCATION + CommonConstant.TENANT_DB)
                                    .load();
                            flyway.migrate();
                        }
                        // 將sharding數據源自身也添加到動態數據源
                        ds.addDataSource(CommonConstant.SHARDING_DS_NAME, shardingDataSource);
                        log.info("動態數據源刷新完成,現有數據源:{}", JSONUtil.toJsonStr(ds.getDataSources().keySet()));
                    }
                } catch (Exception e) {
                    log.error("創建sharding-jdbc數據源異常:{}", ExceptionUtils.getStackTrace(e));
                }
            }
        });
    }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • SubScribe即發佈訂閱模式,在工作中有著廣泛的應用,比如跨組件通信,微前端系統中跨子應用通信等等。 以下是一個簡易的實現: 訂閱 初始化時可限制類型 發佈 限制類型是為了讓訂閱者和發佈者知道預製了哪些類型,避免使用了一些對方不知道的類型。 type Subscriber<T> = (param ...
  • 使用集團的統一埋點採集能力和埋點平臺,完成達達7條業務線共43個站點應用的埋點遷移,降低自研採集工具和平臺的研發投入和機器成本,打通數據鏈路,創造更多的數據分析價值 ...
  • 大家好,我是Java陳序員。 我們在日常開發中,會有很多的應用環境,開發環境、測試環境、回歸環境、生產環境等等。 這些環境,需要部署在一臺台的伺服器上,有的可能是物理機,有的可能是雲伺服器。 那麼,這麼多主機我們要怎麼運維整理呢? 今天,給大家介紹一個輕量級的自動化運維平臺。 項目介紹 Spug—— ...
  • Vue3中響應數據核心是 reactive , reactive 中的實現是由 proxy 加 effect 組合,我們先來看一下 reactive 方法的定義 ...
  • 背景 近期查看公司項目的請求日誌,發現有一段來自俄羅斯首都莫斯科(根據IP是這樣,沒精力溯源)的異常請求,看傳參就能猜到是EXP攻擊,不是瞎掃描瞎傳參的那種。日誌如下(已做部分修改): [2023-11-17 23:54:34] local.INFO: url : http://xxx/_ignit ...
  • Callable介面和Future介面 創建線程的方式 1.繼承Thread類2.實現Runnable介面3.Callable介面4.線程池方式 Callable介面 在繼承Thread類和實現Runnable介面的方式創建線程時,線程執行的run方法中,返回值是void,即無法返回線程的執行結果, ...
  • 十二、指針和引用(二) 1、指針和數組的關係 1)思考 ​ 假設你要設計一種編程語言,你要如何實現數組呢?思考之前請先牢記:數組在記憶體中是連續的,維度由低到高(大部分操作系統下)。 2)彙編分析數組如何實現 //C++代碼 #include <iostream> int main() { int a ...
  • 主要涵蓋了Spring與持久層技術的整合,以及Spring與MyBatis的整合。第一章介紹了為什麼需要將Spring與持久層技術整合,以及Spring可以與哪些持久層技術進行整合。第二章詳細介紹了Spring與MyBatis的整合思路、開發步驟和編碼實例,並解決了MyBatis開發過程中存在的問題... ...
一周排行
    -Advertisement-
    Play Games
  • 最近做項目過程中,使用到了海康相機,官方只提供了C/C++的SDK,沒有搜尋到一個合適的封裝了的C#庫,故自己動手,簡單的封裝了一下,方便大家也方便自己使用和二次開發 ...
  • 前言 MediatR 是 .NET 下的一個實現消息傳遞的庫,輕量級、簡潔高效,用於實現進程內的消息傳遞機制。它基於中介者設計模式,支持請求/響應、命令、查詢、通知和事件等多種消息傳遞模式。通過泛型支持,MediatR 可以智能地調度不同類型的消息,非常適合用於領域事件處理。 在本文中,將通過一個簡 ...
  • 前言 今天給大家推薦一個超實用的開源項目《.NET 7 + Vue 許可權管理系統 小白快速上手》,DncZeus的願景就是做一個.NET 領域小白也能上手的簡易、通用的後臺許可權管理模板系統基礎框架。 不管你是技術小白還是技術大佬或者是不懂前端Vue 的新手,這個項目可以快速上手讓我們從0到1,搭建自 ...
  • 第1章:WPF概述 本章目標 瞭解Windows圖形演化 瞭解WPF高級API 瞭解解析度無關性概念 瞭解WPF體繫結構 瞭解WPF 4.5 WPF概述 ​ 歡迎使用 Windows Presentation Foundation (WPF) 桌面指南,這是一個與解析度無關的 UI 框架,使用基於矢 ...
  • 在日常開發中,並不是所有的功能都是用戶可見的,還在一些背後默默支持的程式,這些程式通常以服務的形式出現,統稱為輔助角色服務。今天以一個簡單的小例子,簡述基於.NET開發輔助角色服務的相關內容,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 第3章:佈局 本章目標 理解佈局的原則 理解佈局的過程 理解佈局的容器 掌握各類佈局容器的運用 理解 WPF 中的佈局 WPF 佈局原則 ​ WPF 視窗只能包含單個元素。為在WPF 視窗中放置多個元素並創建更貼近實用的用戶男面,需要在視窗上放置一個容器,然後在這個容器中添加其他元素。造成這一限制的 ...
  • 前言 在平時項目開發中,定時任務調度是一項重要的功能,廣泛應用於後臺作業、計劃任務和自動化腳本等模塊。 FreeScheduler 是一款輕量級且功能強大的定時任務調度庫,它支持臨時的延時任務和重覆迴圈任務(可持久化),能夠按秒、每天/每周/每月固定時間或自定義間隔執行(CRON 表達式)。 此外 ...
  • 目錄Blazor 組件基礎路由導航參數組件參數路由參數生命周期事件狀態更改組件事件 Blazor 組件 基礎 新建一個項目命名為 MyComponents ,項目模板的交互類型選 Auto ,其它保持預設選項: 客戶端組件 (Auto/WebAssembly): 最終解決方案裡面會有兩個項目:伺服器 ...
  • 先看一下效果吧: isChecked = false 的時候的效果 isChecked = true 的時候的效果 然後我們來實現一下這個效果吧 第一步:創建一個空的wpf項目; 第二步:在項目裡面添加一個checkbox <Grid> <CheckBox HorizontalAlignment=" ...
  • 在編寫上位機軟體時,需要經常處理命令拼接與其他設備進行通信,通常對不同的命令封裝成不同的方法,擴展稍許麻煩。 本次擬以特性方式實現,以兼顧維護性與擴展性。 思想: 一種命令對應一個類,其類中的各個屬性對應各個命令段,通過特性的方式,實現其在這包數據命令中的位置、大端或小端及其轉換為對應的目標類型; ...