利用 Amazon EMR Serverless、Amazon Athena、Apache Dolphinscheduler 以及本地 TiDB 和 HDFS 在混合部署環境中構建無伺服器數據倉庫(一)雲上雲下數據同步方案設計

来源:https://www.cnblogs.com/DolphinScheduler/p/18157437
-Advertisement-
Play Games

引言 在數據驅動的世界中,企業正在尋求可靠且高性能的解決方案來管理其不斷增長的數據需求。本系列博客從一個重視數據安全和合規性的 B2C 金融科技客戶的角度來討論雲上雲下混合部署的情況下如何利用亞馬遜雲科技雲原生服務、開源社區產品以及第三方工具構建無伺服器數據倉庫的解耦方法。 Apache EMR(E ...


引言

在數據驅動的世界中,企業正在尋求可靠且高性能的解決方案來管理其不斷增長的數據需求。本系列博客從一個重視數據安全和合規性的 B2C 金融科技客戶的角度來討論雲上雲下混合部署的情況下如何利用亞馬遜雲科技雲原生服務、開源社區產品以及第三方工具構建無伺服器數據倉庫的解耦方法。

file

Apache EMR(Elastic MapReduce)Serverless 是亞馬遜雲科技推出的一種全托管的無伺服器大數據處理服務。它基於 Apache Spark 和 Apache Hive 計算引擎,提供計算和存儲分離的架構,實現架構彈性的同時,增強了性能。

Apache DolphinScheduler 是一種與 EMR 集群解耦部署的多功能工作流調度程式,可確保高效可靠的數據編排和處理。此外,Amazon Athena 使客戶能夠使用標準 SQL 執行 Ad-hoc 查詢並分析大量數據集,從而無需複雜的基礎設施管理。通過 AWS 控制台實現的開放的集成測試,為這些組件的無縫集成和驗證提供了可能,大大加快了工程師的工作效率。對於金融科技客戶,EMR Serverless 可以提供業務線(LOB)級別的精細資源消費分析,從而實現精確監控和成本優化。這一功能在金融領域尤其有價值。因為在該領域,運營敏捷性和成本效益至關重要。B2C 金融科技客戶非常重視數據安全性和合規性。為瞭解決這些問題,本案客戶採用了本地和雲混合架構。敏感數據存儲在本地。本博客討論了實現本地系統和雲環境之間數據無縫同步的具體解決方案。該解決方案使客戶能夠保持對敏感信息的嚴格控制,同時受益於雲計算的可擴展性和靈活性。

本文著重探討雲上雲下數據同步方案的設計。

架構設計

金融科技客戶非常關註數據安全和合規性。對於博客中討論的具體案例,業務數據存儲在本地 TiDB 上,而用戶行為數據通過Sensors Data套件收集,存儲在本地 HDFS 上。 TiDB 是亞馬遜雲科技全球合作伙伴。

亞馬遜雲科技上的 TiDB 產品服務信息可以通過此鏈接獲取。Sensors Data 是亞馬遜雲科技大中國區的合作伙伴。 亞馬遜雲科技上的 Senors Data 產品服務信息可以通過此鏈接獲取。

這些本地數據源通亞馬遜雲科技 Direct Connect 連接到亞馬遜雲科技的 Region。在亞馬遜雲科技的環境中,數據流經 Interface Endpoint for S3、亞馬遜雲科技 PrivateLink,最終訪問 S3 存儲桶(如下圖所示,存儲桶名為 ODS(示例))。介面終端節點由通過 Amazon Route53 托管的 DNS 解析器註冊和管理。

然後,數據由 Amazon EMR Serverless Job(Hive 作業或 Spark 作業)處理,以實現數據倉庫分層邏輯。不同的分層數據存儲在單獨的 S3 存儲桶中或同一 S3 存儲桶下的比不同的 S3 首碼中。這些數據的架構通過 Glue 數據目錄進行管理,並且可以通過 Amazon Athena 控制台進行查詢。

第三方 BI 工具通過 JDBC 與 Amazon Athena 進一步集成,實現數據可視化和生成數據報告,滿足不同的業務需求,包括監管要求。

EMR Serveless Job 通過在 3 個 EC2 實例上以集群模式部署的 Apache DolphinScheduler 進行編排。

DolphinScheduler 集群與其編排的 EMR 作業解耦部署,實現了整個系統的高可靠性:一個(EMR 作業或調度器)發生故障不會影響另一個(調度器或 EMR 作業)。


解決方案系統架構圖

雲上雲下數據同步解決方案

從網路基礎設施的角度來看,亞馬遜雲科技 Direct Connect 被用來實現客戶本地和亞馬遜雲科技區域之間的連接。在亞馬遜雲科技環境中,數據流經 Interface Endpoint for S3、亞馬遜雲科技 PrivateLink,最終訪問 S3 存儲桶(如下圖所示,存儲桶名為 ODS(示例))。介面終端節點由通過 Amazon Route53 托管的 DNS 解析器註冊和管理。有關進一步的架構、工作機制說明以及部署指南,請參閱privatelink-interface-endpoints文檔。

從數據傳輸的角度,設計了軟體級雙向數據同步解決方案。包括 3 個子場景:

  1. 存量數據從本地同步到亞馬遜雲科技 Region;
  2. 增量數據從本地同步到亞馬遜雲科技 Region;
  3. 將數據從亞馬遜雲科技 Region 反向同步到本地。

對於每個場景,都有特定的要求:

  1. 數據同步解決方案應該在源是 TiDB、HDFS,目標是亞馬遜雲科技 S3 的上下文中工作;
  2. 需要數據完整性檢查機制,確保數據得到一致同步。

表 1 描述了滿足每個子場景的特定要求的具體的解決方案。

表 1:雲上雲下數據同步解決方案設計

具體要求
數據同步解決方案在一端是 TiDB、HDFS、另一端是亞馬遜雲科技 S3 的環境下工作 數據完整性檢查
子場景 存量數據從本地同步到亞馬遜雲科技 Region TiDB Dumpling 將數據從 TiDB 同步到亞馬遜雲科技 S3 亞馬遜雲科技 DataSync 將數據從本地 HDFS 同步到亞馬遜雲科技 S3 自主研發的數據完整性檢查 亞馬遜雲科技 DataSync 執行完整性檢查以確保寫入目標的數據與從源讀取的數據匹配
增量數據從本地到亞馬遜雲科技區域 Region TiDB Dumpling(帶有 ETL 邏輯)/TiDB CDC Connector 從 TiDB 到亞馬遜雲科技 S3 亞馬遜雲科技 DataSync 將數據從本地 HDFS 同步到亞馬遜雲科技 S3 自主研發的數據完整性檢查 亞馬遜雲科技 DataSync 執行完整性檢查以確保寫入目標的數據與從源讀取的數據匹配
將數據從亞馬遜雲科技區域反向同步到本地 EMR Serverless Job 將數據從 Glue Catalog 表同步到 TiDB 表 自主研發的數據完整性檢查

具體解決辦法解釋如下:

存量數據同步

利用 TiDB Dumpling 將數據從 TiDB 同步到亞馬遜雲科技 S3

如何實現本地 TiDB 數據同步到 AWS S3 ,可以參考導出數據到亞馬遜 S3 雲存儲指導。通過執行以下命令,存儲在 TiDB 中的數據可以轉儲為 csv 文件並存儲在 AWS S3 存儲桶中。

./dumpling -u root -P 4000 -h 127.0.0.1 -r 200000 -o "s3://${Bucket}/${Folder}" –filetype csv

存量數據同步

利用亞馬遜雲科技 DataSync 將數據從本地 HDFS 同步到亞馬遜雲科技 S3

亞馬遜雲科技 DataSync 代理應安裝在客戶本地的伺服器上。連接到 Hadoop 集群時,亞馬遜雲科技 DataSync 代理充當 HDFS 客戶端,與 Hadoop 集群中的主 NameNode 通信,然後從 DataNode 複製文件數據。可以通過亞馬遜雲科技 DataSync獲取該操作指南,將數據從 Hadoop HDFS 同步到 Amazon S3。

增量數據同步

利用 TiDB Dumpling 和自管理的檢查點

為了通過 TiDB Dumpling 工具實現增量數據同步,需要自行管理目標同步數據的檢查點。一種推薦的方法是將最後攝取的記錄的 id 存儲到特定介質(例如 ElastiCache for Redis、DynamoDB)中,以在執行觸發 TiDB Dumpling 的 shell/python 作業時實現自我管理檢查點。

當然,實現該方案的前提是目標表有一個單調遞增的 id 欄位作為主鍵。

對導出的數據進行過濾,可以獲取具體的 TiDB Dumpling 命令。示例命令如下所示。

./dumpling -u root -P 4000 -h 127.0.0.1 -o /tmp/test --where "id < 100"

增量數據同步

利用 TiDB CDC Connector 從 TiDB 到亞馬遜雲科技 S3

利用 TiDB CDC Connector 實現 TiDB 到亞馬遜雲科技 S3 的增量數據同步的好處是有原生的 CDC 機制,而且由於後端引擎是 Flink,所以性能很快。然而,這種方法有一個棘手的點或權衡點:需要創建相當多的 Flink 表來映射亞馬遜雲科技上的 ODS 表。

本 TiDB CDC Connector 操作指南可以通過Tidb CDC 獲取

增量數據同步

利用 EMR Serverless Job 將數據從 Glue Catalog 表反向同步到 TiDB 表

大多數數據從客戶的本地流向亞馬遜雲科技。但是,存在這樣的場景:根據特定業務的需要,數據從亞馬遜雲科技反向流向客戶本地。

數據著落亞馬遜雲科技後,將通過使用特定表結構創建的 Athena 表通過 Glue 數據目錄進行打包/管理。表 DDL 腳本如下所示:

CREATE EXTERNAL TABLE IF NOT EXISTS `table_name`(
  `id` string,
  ……
  `created_at` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 's3://bucket_name/prefix_name/';  

在這種情況下,EMR Serverless Spark Job 可以完成將數據從亞馬遜雲科技 Glue 表反向同步到客戶本地表的工作。

如果 Spark 作業是用 Scala 編寫的,示例代碼如下:

package com.example
import org.apache.spark.sql.{DataFrame, SparkSession}

object Main  {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("<app name>")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("show databases").show()
    spark.sql("use default")
    var df=spark.sql("select * from <glue table name>")

    df.write
      .format("jdbc")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("url", "jdbc:mysql://tidbcloud_endpoint:4000/namespace")
      .option("dbtable", "table_name")
      .option("user", "use_name")
      .option("password", "password_string")
      .save()

    spark.close()
  }

}

通過 SBT 將 Scala 代碼打包為 jar 文件後,可以通過以下亞馬遜雲科技 Cli 命令將作業提交到 EMR Serverless 引擎:

export applicationId=00fev6mdk***

export job_role_arn=arn:aws:iam::<aws account id>:role/emr-serverless-job-role

aws emr-serverless start-job-run \
    --application-id $applicationId \
    --execution-role-arn $job_role_arn \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://spark-sql-test-nov23rd/scripts/dec13-1/scala-glue_2.13-1.0.1.jar",
            "sparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.driver.cores=1 --conf spark.driver.memory=3g --conf spark.executor.cores=4 --conf spark.executor.memory=3g --jars s3://spark-sql-test-nov23rd/mysql-connector-j-8.2.0.jar"
        }
    }'

如果 Spark 作業是用 Pyspark 編寫的,示例代碼如下:

import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

if __name__ == "__main__":

    spark = SparkSession\
        .builder\
        .appName("app1")\
        .enableHiveSupport()\
        .getOrCreate()

    df=spark.sql(f"select * from {str(sys.argv[1])}")

    df.write.format("jdbc").options(
        driver="com.mysql.cj.jdbc.Driver",
        url="jdbc:mysql://tidbcloud_endpoint:4000/namespace ",
        dbtable="table_name",
        user="use_name",
        password="password_string").save()

    spark.stop()

可以通過以下亞馬遜雲科技 CLI 命令將該作業提交到 EMR Serverless 引擎:

export applicationId=00fev6mdk***

export job_role_arn=arn:aws:iam::<aws account id>:role/emr-serverless-job-role

aws emr-serverless start-job-run \
    --application-id $applicationId \
    --execution-role-arn $job_role_arn \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://spark-sql-test-nov23rd/scripts/dec13-1/testpython.py",
            "entryPointArguments": ["testspark"],
            "sparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.driver.cores=1 --conf spark.driver.memory=3g --conf spark.executor.cores=4 --conf spark.executor.memory=3g --jars s3://spark-sql-test-nov23rd/mysql-connector-j-8.2.0.jar"
        }
    }'

上述 Pyspark 代碼和亞馬遜雲科技 CLI 命令同時實現了外部傳參:提交作業時將表名傳輸到 SQL 語句中。

自研的數據完整性檢查

完備的數據完整性校驗通過在源庫上創建校驗庫,選擇非空唯一欄位計算校驗值和行數,在目標庫上使用與源庫相同的欄位計算校驗值和行數,比較源庫和目標庫的校驗值和行數實現。

如果校驗結果是不一致,那麼需要手動對比和調整。這種校驗方式的前提條件是源庫和目標庫都是關係型資料庫。本文中 TiDB 向亞馬遜雲科技 S3 的數據同步,目標端是對象存儲而不並是資料庫。

因此,數據完整性的檢驗上會有些 trade-off。實戰中,採用對比目標數據集的列總數和行總數,以及列名稱的方式實現。

亞馬遜雲科技 DataSync 數據完整性

DataSync 利用亞馬遜雲科技設計的與其連接的存儲協議無關的傳輸協議,在數據移動時執行實時校驗和驗證。詳細信息可以在configure-data-verification-options獲取。除了實時校驗和驗證之外,DataSync 還支持增量傳輸、內聯壓縮。

DataSync 處理傳輸過程,因此用戶無需編寫和優化自己的複製腳本,也無需部署和微調商業數據傳輸工具。內置監控可確保移動文件和對象的數據完整性,並採用自動重試機制,以便到達目標文件存儲的內容與原始文件匹配。

總結

金融科技客戶非常註重數據安全和合規。為規避潛在的風險,本案例所涉及的客戶的做法是將用戶的出入金數據、用戶的基礎數據(統稱為業務數據)放在 IDC,而用戶的行為數據以及脫敏之後的業務數據放在雲馬遜雲科技平臺中。

從亞馬遜雲科技所服務的全球範圍內的 FSI 行業客戶看,越來越多的金融科技公司選擇將業務數據也存放在亞馬遜雲科技平臺上。亞馬遜雲科技為客戶提供的雲平臺及服務在安全和合規方面積累了非常豐富的認證,包括平臺整體認證、適配所在國家/地區監管法規的認證、行業認證等等;同時亞馬遜雲科技也開發了非常豐富的產品服務幫助客戶應對數據安全合規角度的各種需求。

參考資料

系列博客

本文由 白鯨開源 提供發佈支持!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 聲明: 以下內容為個人筆記,內容不完全正確,請謹慎參考。 文本處理工具 cut: cut 工作是“剪”,具體來說就是在文件中負責剪切數據。cut 命令從文件的每個行剪切位元組、字元和欄位輸出。 1、基本語法: cut [選項參數] filename 說明:預設分隔符是副表符 2、選項參數說明 選項參數 ...
  • 雙向迴圈鏈表 原理與應用 雙向迴圈鏈表與雙向鏈表的區別:指的是雙向迴圈鏈表的首結點中的prev指針成員指向鏈表的尾結點,並且雙向迴圈鏈表的尾結點里的next指針成員指向鏈表的首結點,所以雙向迴圈鏈表也屬於環形結構。 雙向迴圈鏈表各功能實現 (1)為了管理雙向迴圈鏈表,需要構造頭結點的數據類型以及構造 ...
  • 1.綜述 本文以HiveSQL語法進行代碼演示。 對於其他資料庫來說同樣也適用,比如SparkSQL,FlinkSQL以及Mysql8,Oracle,SqlServer等傳統的關係型資料庫。 已更新第一類聚合函數類,點擊這裡閱讀 ①SQL視窗函數系列一之聚合函數類 ②SQL視窗函數系列二之分組排序窗 ...
  • 隨著企業數據規模的增長和業務多元化發展,海量數據實時、多維地靈活查詢變成業務常見訴求。同時多套資料庫系統成為常態,這既帶來了數據管理的複雜性,又加大了數據使用的難度,面對日益複雜的數據環境和嚴格的數據安全要求,需要解決多資料庫系統並存、數據孤島嚴重、許可權管理混亂和數據查詢提取困難等問題。與此同時,企 ...
  • Spark 是一個快速、通用、可擴展的大數據計算引擎,具有高性能、易用、容錯、可以與 Hadoop 生態無縫集成、社區活躍度高等優點。在實際使用中,具有廣泛的應用場景: · 數據清洗和預處理:在大數據分析場景下,數據通常需要進行清洗和預處理操作以確保數據質量和一致性,Spark 提供了豐富的 API ...
  • 目錄一、什麼是MongoDB二、MongoDB 與關係型資料庫對比三、數據類型四、部署MongoDB1、下載二進位包2、下載安裝包並解壓3、創建用於存放數據和日誌的目錄,並修改許可權4、啟動MongoDB4.1前臺啟動4.2後臺啟動4.3、配置文件啟動服務4.4、配置systemd服務4.5、syst ...
  • 本文介紹基於Microsoft SQL Server軟體,實現資料庫表的創建、修改、複製、刪除與表數據處理的方法。 目錄1 互動式創建資料庫表T2 互動式創建資料庫表S3 T-SQL創建資料庫表C4 T-SQL創建資料庫表SC5 T-SQL創建資料庫表TC6 互動式向資料庫表S中添加新列NATIVE ...
  • 字元編碼和排序規則 下麵的討論用到W、王和三個字元,以下是這三個字元的各種編碼 先看看不帶N和帶N的字元字面量各用什麼編碼,用Microsoft SQL Server Management Studio連接SQL SERVER 2022執行下麵SQL語句: select N'W' charact ...
一周排行
    -Advertisement-
    Play Games
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...
  • 目錄前言PostgreSql安裝測試額外Nuget安裝Person.cs模擬運行Navicate連postgresql解決方案Garnet為什麼要選擇Garnet而不是RedisRedis不再開源Windows版的Redis是由微軟維護的Windows Redis版本老舊,後續可能不再更新Garne ...
  • C#TMS系統代碼-聯表報表學習 領導被裁了之後很快就有人上任了,幾乎是無縫銜接,很難讓我不想到這早就決定好了。我的職責沒有任何變化。感受下來這個系統封裝程度很高,我只要會調用方法就行。這個系統交付之後不會有太多問題,更多應該是做小需求,有大的開發任務應該也是第二期的事,嗯?怎麼感覺我變成運維了?而 ...
  • 我在隨筆《EAV模型(實體-屬性-值)的設計和低代碼的處理方案(1)》中介紹了一些基本的EAV模型設計知識和基於Winform場景下低代碼(或者說無代碼)的一些實現思路,在本篇隨筆中,我們來分析一下這種針對通用業務,且只需定義就能構建業務模塊存儲和界面的解決方案,其中的數據查詢處理的操作。 ...
  • 對某個遠程伺服器啟用和設置NTP服務(Windows系統) 打開註冊表 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\TimeProviders\NtpServer 將 Enabled 的值設置為 1,這將啟用NTP伺服器功 ...
  • title: Django信號與擴展:深入理解與實踐 date: 2024/5/15 22:40:52 updated: 2024/5/15 22:40:52 categories: 後端開發 tags: Django 信號 松耦合 觀察者 擴展 安全 性能 第一部分:Django信號基礎 Djan ...
  • 使用xadmin2遇到的問題&解決 環境配置: 使用的模塊版本: 關聯的包 Django 3.2.15 mysqlclient 2.2.4 xadmin 2.0.1 django-crispy-forms >= 1.6.0 django-import-export >= 0.5.1 django-r ...
  • 今天我打算整點兒不一樣的內容,通過之前學習的TransformerMap和LazyMap鏈,想搞點不一樣的,所以我關註了另外一條鏈DefaultedMap鏈,主要調用鏈為: 調用鏈詳細描述: ObjectInputStream.readObject() DefaultedMap.readObject ...
  • 後端應用級開發者該如何擁抱 AI GC?就是在這樣的一個大的浪潮下,我們的傳統的應用級開發者。我們該如何選擇職業或者是如何去快速轉型,跟上這樣的一個行業的一個浪潮? 0 AI金字塔模型 越往上它的整個難度就是職業機會也好,或者說是整個的這個運作也好,它的難度會越大,然後越往下機會就會越多,所以這是一 ...
  • @Autowired是Spring框架提供的註解,@Resource是Java EE 5規範提供的註解。 @Autowired預設按照類型自動裝配,而@Resource預設按照名稱自動裝配。 @Autowired支持@Qualifier註解來指定裝配哪一個具有相同類型的bean,而@Resourc... ...