技術乾貨|如何利用 ChunJun 實現數據實時同步?

来源:https://www.cnblogs.com/DTinsight/archive/2023/04/24/17349354.html
-Advertisement-
Play Games

實時同步是 ChunJun 的⼀個重要特性,指在數據同步過程中,數據源與⽬標系統之間的數據傳輸和更新⼏乎在同⼀時間進⾏。 在實時同步場景中我們更加關註源端,當源系統中的數據發⽣變化時,這些變化會⽴即傳輸並應⽤到⽬標系統,以保證兩個系統中的數據保持⼀致。這個特性需要作業運⾏過程中 source 插件不 ...


實時同步是 ChunJun 的⼀個重要特性,指在數據同步過程中,數據源與⽬標系統之間的數據傳輸和更新⼏乎在同⼀時間進⾏。

在實時同步場景中我們更加關註源端,當源系統中的數據發⽣變化時,這些變化會⽴即傳輸並應⽤到⽬標系統,以保證兩個系統中的數據保持⼀致。這個特性需要作業運⾏過程中 source 插件不間斷地頻繁訪問源端。在⽣產場景下,對於這類⻓時間運⾏、資源可預估、需要穩定性的作業,我們推薦使⽤ perjob 模式部署。

插件⽀持 JSON 腳本和 SQL 腳本兩種配置⽅式,具體的參數配置請參考「ChunJun連接器文檔」:https://sourl.cn/vxq6Zp

本文將為大家介紹如何使用 ChunJun 實時同步,以及 ChunJun ⽀持的 RDB 實時採集插件的特性、採集邏輯及其原理,幫助大家更好地理解 ChunJun 與實時同步。

如何使用 ChunJun 實時同步

為了讓⼤家能更深⼊瞭解如何使⽤ ChunJun 做實時同步,我們假設有這樣⼀個場景:⼀個電商⽹站希望將其訂單數據從 MySQL 資料庫實時同步到 HBase 資料庫,以便於後續的數據分析和處理。

在這個場景中,我們將使⽤ Kafka 作為中間消息隊列,以實現 MySQL 和 HBase 之間的數據同步。這樣做的好處是 MySQL 表中變更可以實時同步到 HBase 結果表中,⽽不⽤擔⼼歷史數據被修改後 HBase 表未被同步。

如果在⼤家的實際應用場景中,不關⼼歷史數據是否變更(或者歷史數據根本不會變更),且業務表有⼀個遞增的主鍵,那麼可以參考本⽂之後的 JDBC-Polling 模式⼀節的內容。

· 數據源組件的部署以及 ChunJun 的部署這⾥不做詳細描述

· 案例中的腳本均以 SQL 腳本為例,JSON 腳本也能實現相同功能,但在參數名上可能存在出⼊,使⽤ JSON 的同學可以參考上文 「ChunJun 連接器」⽂檔中的參數介紹

採集 MySQL 數據到 Kafka

● 數據準備

⾸先,我們在 Kafka 中創建⼀個名為 order_dml 的 topic,然後在 MySQL 中創建⼀個訂單表,並插⼊⼀些測試數據。創建表的 SQL 語句如下:

-- 創建⼀個名為ecommerce_db的資料庫,⽤於存儲電商⽹站的數據
CREATE DATABASE IF NOT EXISTS ecommerce_db;
USE ecommerce_db;
-- 創建⼀個名為orders的表,⽤於存儲訂單信息
CREATE TABLE IF NOT EXISTS orders (
 id INT AUTO_INCREMENT PRIMARY KEY, -- ⾃增主鍵
 order_id VARCHAR(50) NOT NULL, -- 訂單編號,不能為空
 user_id INT NOT NULL, -- ⽤戶ID,不能為空
 product_id INT NOT NULL, -- 產品ID,不能為空
 quantity INT NOT NULL, -- 訂購數量,不能為空
 order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- 訂單⽇期,預設值為當前時間
戳,不能為空
);
-- 插⼊⼀些測試數據到orders表
INSERT INTO orders (order_id, user_id, product_id, quantity) 
VALUES ('ORD123', 1, 101, 2), 
('ORD124', 2, 102, 1), 
('ORD125', 3, 103, 3), 
('ORD126', 1, 104, 1), 
('ORD127', 2, 105, 5);

● 使用 Binlog 插件採集數據到 Kafka

為了表示數據的變化類型和更好地處理數據變化,實時採集插件一般會用 RowData(Flink 內部數據結構)中的 RowKind 記錄⽇志中的數據事件(insert、delete 等)類型,binlog 插件也⼀樣。而當數據被打到 Kafka 中時,RowKind 信息應該怎麼處理呢?

這⾥我們就需要⽤到 upsert-kafka-x,upsert-kafka-x 會識別 RowKind。對各類時間的處理邏輯如下:

• insert 數據:序列化後直接打⼊

• delete 數據:只寫 key,value 置為 null

• update 數據:分為⼀條 delete 數據和 insert 數據處理,即先根據主鍵刪除原本的數據,再寫⼊ update 後的數據

在下⼀步中我們再解釋如何將 Kafka 中的數據還原到 HBase 或者其他⽀持 upsert 語義的資料庫中,接下來我們來編寫 SQL 腳本,實現 MySQL 數據實時採集到 Kafka 中的功能,示例如下:

CREATE TABLE binlog_source ( 
id int, 
order_id STRING, 
user_id INT, 
product_id int, 
quantity int, 
order_date TIMESTAMP(3) 
) WITH ( 
'connector' = 'binlog-x', 
'username' = 'root', 
'password' = 'root', 
'cat' = 'insert,delete,update', 
'url' = 'jdbc:mysql://localhost:3306/ecommerce_db?useSSL=false', 
'host' = 'localhost', 
'port' = '3306', 
'table' = 'ecommerce_db.orders', 
'timestamp-format.standard' = 'SQL', 
'scan.parallelism' = '1' 
); 
CREATE TABLE kafka_sink ( 
id int, 
order_id STRING, 
user_id INT, 
product_id int, 
quantity int, 
order_date TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED 
) WITH ( 
'connector' = 'upsert-kafka-x', 
'topic' = 'orders', 
'properties.bootstrap.servers' = 'localhost:9092', 
'key.format' = 'json', 
'value.format' = 'json', 
'value.fields-include' = 'ALL', 
'sink.parallelism' = '1' 
); 
insert into 
kafka_sink 
select 
* 
from 
binlog_source u;

還原 Kafka 中的數據到 HBase

上述步驟中,我們通過 binlog-x 和 upsert-kafka-x,將 MySQL 中的數據實時採集到了 Kafka 中。解鈴還須系鈴⼈,我們可以通過 upsert-kafka-x 再去將 Kafka 中的數據解析成帶有 upsert 語義的數據。

upsert-kafka-x 作為 source 插件時,會判斷 Kafka 中數據的 value 是否為 null,如果 value 為 null 則標記這條數據的 RowKind 為 DELETE,否則將數據的 ROWKIND 標記為 INSERT。

ChunJun的 hbase-x 插件⽬前已經具備了 upsert 語句的能⼒,使⽤ hbase-x 即可將 Kafka 中的數據還原到 hbase中。接下來是 SQL 腳本示例,為了⽅便在 HBase 中查看數據結果,我們將 int 數據 cast 為 string 類型:

CREATE TABLE kafka_source ( 
id int, 
order_id STRING, 
user_id INT, 
product_id INT, 
quantity INT, 
order_date TIMESTAMP(3), 
PRIMARY KEY (id) NOT ENFORCED 
) WITH ( 
'connector' = 'upsert-kafka-x', 
'topic' = 'orders', 
'properties.bootstrap.servers' = 'localhost:9092', 
'properties.group.id' = 'test_group', 
'key.format' = 'json', 
'value.format' = 'json', 
'scan.parallelism' = '1' 
); 
CREATE TABLE hbase_sink( 
rowkey STRING, order_info ROW < order_id STRING, 
user_id STRING, 
product_id STRING, 
quantity STRING, 
order_date STRING >, 
PRIMARY KEY (rowkey) NOT ENFORCED 
) WITH( 
-- 這⾥以hbase14為例,如果hbase版本是2.x,我們可以使⽤hbase2-x插件代替 
'connector' = 'hbase14-x', 
'zookeeper.quorum' = 'localhost:2181', 
'zookeeper.znode.parent' = '/hbase', 
'table-name' = 'ecommerce_db:orders', 
'sink.parallelism' = '1' 
); 
INSERT INTO 
hbase_sink 
SELECT 
cast(id as STRING), 
ROW( 
cast(order_id as STRING), 
cast(user_id as STRING), 
cast(product_id as STRING), 
cast(quantity as STRING), 
cast(order_date as STRING) 
) 
FROM 
kafka_source

Tips:如果我們不需要 Kafka 中間件,也可以使⽤ binlog-x 插件直接對接 hbase-x 插件。

ChunJun 支持的 RDB 實時採集插件

本節主要介紹 ChunJun 的 RDB 實時採集插件的特性、採集邏輯及其原理。

ChunJun 的 RDB 實時採集可以實時監視資料庫中的更改,併在發⽣更改時讀取數據變化,例如插⼊、更新和刪除操作。使⽤ ChunJun 實時採集,我們可以實時獲取有關資料庫中更改的信息,從⽽能夠及時響應這些更改,如此便可以幫助我們更好地管理和利⽤ RDB 資料庫中的數據。

並且 ChunJun 提供了故障恢復和斷點續傳功能來確保數據的完整性。ChunJun 實時採集類插件的⼤致實現步驟如下:

· 連接資料庫,確認讀取點位,讀取點位可以理解為⼀個 offset,如 Binlog 中,指⽇志的⽂件名和⽂件的 position 信息

· 根據讀取點位開始讀取 redolog,獲取其中關於數據變更相關的操作記錄

· 根據 tableName、操作事件(如insert、delete、update)等過濾信息過濾出需要的 log ⽇志

· 解析 log ⽇志,解析後的事件信息包括表名、資料庫名、操作類型(插⼊、更新或刪除)和變更的數據⾏等

· 將解析出來的數據會加⼯為 ChunJun 內部統⼀的 DdlRowData 供下游使⽤

ChunJun ⽬前已⽀持的實時採集 Connector 有:binlog(mysql)、oceanbasecdc、oraclelogminer、sqlservercdc。

Binlog 簡介

ChunJun binlog 插件的主要功能是讀取 MySQL 的⼆進位⽇志(binlog)⽂件。這些⽂件記錄了所有對數據的更改操作,如插⼊、更新和刪除等。⽬前,該插件依賴 Canal 組件來讀取 MySQL 的 binlog ⽂件。

核⼼操作步驟如下:

• 確認讀取點位:在 binlog 插件中,我們可以在腳本的 start 欄位中直接指定 journal-name(binlog ⽂件名)和 position(⽂件的特定位置)

• 讀取binlog:binlog 插件將⾃身偽裝成 MySQL 的 Slave 節點,向 MySQL Master 發送請求,要求將 binlog ⽂件的數據流發送給它

• 故障恢復和斷點續傳:故障時,插件會記錄當前的 binlog 位置信息,從 checkpoint/savepoint 恢復後,我們可以從上次記錄的位置繼續讀取 binlog ⽂件,確保數據變化的完整性

使⽤ binlog 所需的許可權在「binlog插件使⽤⽂檔」中有詳細說明,鏈接如下:

https://sourl.cn/mvae9m

OracleLogminer 簡介

Logminer 插件藉助 Oracle 提供的 Logminer ⼯具通過讀取視圖的⽅式獲取 Oracle redolog 中的信息。

核⼼操作步驟如下:

01 定位需讀取起始點位(start_scn)

⽬前 logminer ⽀持四種策略指定 StartScn:

· all:從 Oracle 資料庫中最早的歸檔⽇志組開始採集(不建議使⽤)

· current:任務運⾏時的 SCN 號

· time:指定時間點對應的 SCN 號

· scn:直接指定 SCN 號

02 定位需要讀取的結束點位(end_scn)

插件根據 start_scn 和 maxLogFileSize(預設5G)獲取可載入的 redolog ⽂件列表,end_scn 取這個⽂件列表中最⼤的 scn 值。

03 載入 redo ⽇志到 Logminer

通過⼀個存儲過程,將 scn 區間範圍內的 redolog 載入到 Logminer ⾥。

04 從視圖中讀取數據

以 scn > ? 作為 where 條件直接查詢 v$logmnr_contents 視圖內的信息即可獲取 redolog 中的數據。

05 重覆1-4步驟,實現不斷的讀取

如標題。

06 故障恢復和斷點續傳

在發⽣故障時,插件會保存當前消費的 scn 號,重啟時從上次的 scn 號開始讀取,確保數據完整性。

• 關於該插件原理的詳細介紹請參⻅「Oracle Logminer 實現原理說明⽂檔」:

https://sourl.cn/6vqz4b

• 使⽤lominer插件的前提條件詳⻅「Oracle配置LogMiner」:

https://sourl.cn/eteyZY

SqlServerCDC 簡介

SqlServerCDC 插件依賴 SQL Server 的 CDC Agent 服務提供的視圖獲取 redolog 中的信息。

核⼼操作步驟如下:

01 定位需讀取起始點位(from_lsn)

⽬前 SqlserverCDC 僅⽀持直接配置 lsn 號,如果 lsn 號未配置,則取資料庫中當前最⼤的 lsn 號為 from_lsn。

02 定位需要讀取的結束點位(to_lsn)

SqlserverCDC 插件定期地(可通過 pollInterval 參數指定)獲取資料庫中的最⼤ lsn 為 end_lsn。

03 從視圖中讀取數據

查詢 Agent 服務提供的視圖中 lsn 區間範圍內的數據,過濾出需要監聽的表及事件類型。

04 重覆1-3步驟,實現不斷的讀取

如標題。

05 故障恢復和斷點續傳

在發⽣故障時,插件會保存當前消費的 lsn 號。重啟時從上次的 lsn 號開始讀取,確保數據完整性。

• 關於該插件原理的詳細介紹請參⻅「Sqlserver CDC 實現原理說明⽂檔」:

https://sourl.cn/5pQvEM

• 配置 SqlServer CDC Agent 服務詳⻅「Sqlserver 配置 CDC ⽂檔」:

https://sourl.cn/h5nd8j

OceanBaseCDC 簡介

OceanBase 是螞蟻集團開源的⼀款分散式關係型資料庫,它使⽤⼆進位⽇志(binlog)記錄數據變更。OceanBaseCDC 的實現依賴於 OceanBase 提供的 LogProxy 服務,LogProxy 提供了基於發佈-訂閱模型的服務,允許使⽤ OceanBase 的 logclient 訂閱特定的 binlog 數據流。

OceanBaseCDC 啟動⼀個 Listener 線程。當 logclient 連接到 LogProxy 後,Listener 會訂閱經過數據過濾的 binlog,然後將其添加到內部維護的列表中。當收到 COMMIT 信息後,Listener 會將⽇志變更信息傳遞給⼀個阻塞隊列,由主線程消費並將其轉換為 ChunJun 內部的 DdlRowData,最終發送到下游。

JDBC-Polling 模式讀

JDBC 插件的 polling 讀取模式是基於 SQL 語句做數據讀取的,相對於基於重做⽇志的實時採集成本更低,但 jdbc 插件做實時同步對業務場景有更⾼的要求:

· 有⼀個數值類型或者時間類型的遞增主鍵

· 不更新曆史數據或者不關⼼歷史數據是否更新,僅關⼼新數據的獲取

實現原理簡介

• 設置遞增的業務主鍵作為 polling 模式依賴的增量鍵

• 在增量讀取的過程中,實時記錄 increColumn 對應的值(state),作為下⼀次數據讀取的起始點位

• 當⼀批數據讀取完後,間隔⼀段時間之後依據 state 讀取下⼀批數據

polling 依賴部分增量同步的邏輯,關於增量同步的更多介紹可以點擊:

https://sourl.cn/UC8n6K

如何配置⼀個 jdbc-polling 作業

先介紹⼀下開啟 polling 模式需要關註的配置項:

file

以 MySQL 為例,假設我們有⼀個存儲訂單信息的歷史表,且訂單的 order_id 是遞增的,我們希望定期地獲取這張表的新增數據。

CREATE TABLE order.realtime_order_archive ( 
order_id INT PRIMARY KEY COMMENT '訂單唯⼀標識', 
customer_id INT COMMENT '客戶唯⼀標識', 
product_id INT COMMENT '產品唯⼀標識', 
order_date TIMESTAMP COMMENT '訂單⽇期和時間', 
payment_method VARCHAR(255) COMMENT '⽀付⽅式(信⽤卡、⽀付寶、微信⽀付等)', 
shipping_method VARCHAR(255) COMMENT '配送⽅式(順豐速運、圓通速遞等)', 
shipping_address VARCHAR(255) COMMENT '配送地址', 
order_total DECIMAL(10,2) COMMENT '訂單總⾦額', 
discount DECIMAL(10,2) COMMENT '折扣⾦額', 
order_status VARCHAR(255) COMMENT '訂單狀態(已完成、已取消等)' 
); 

我們可以這樣配置 json 腳本的 reader 信息。

"name": "mysqlreader", 
"parameter": { 
"column" : [ 
"*" //這⾥假設我們讀取所有欄位,可以填寫‘*’ 
], 
"increColumn": "id", 
"polling": true, 
"pollingInterval": 3000, 
"username": "username", 
"password": "password", 
"connection": [ 
{ 
"jdbcUrl": [ 
"jdbc:mysql://ip:3306/liuliu?useSSL=false" 
], 
"schema":"order", 
"table": [ 
"realtime_order_archive" ] 
} 
] 
} 
}

《數棧產品白皮書》:https://fs80.cn/cw0iw1

《數據治理行業實踐白皮書》下載地址:https://fs80.cn/380a4b

想瞭解或咨詢更多有關袋鼠雲大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky

同時,歡迎對大數據開源項目有興趣的同學加入「袋鼠雲開源框架釘釘技術qun」,交流最新開源技術信息,qun號碼:30537511,項目地址:https://github.com/DTStack


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一款輕量級、高性能、功能強大的內網穿透代理伺服器。支持tcp、udp、socks5、http等幾乎所有流量轉發,並帶有功能強大的web管理端。 ...
  • 記錄 mongo 資料庫用原生自帶的命令工具使用 json 文件方式進行導入、導出的操作! 在一次數據更新中,同事把老數據進行了清空操作,但是新的邏輯數據由於某種原因(好像是她的電腦中病毒了),一直無法正常連接資料庫進行數據插入,然後下午2點左右要給甲方演示,所以要緊急恢複本地的部分數據到生產庫。 ...
  • 功能03-優惠券秒殺01 4.功能03-優惠券秒殺 4.1全局唯一ID 4.1.1全局ID生成器 每個店鋪都可以發佈優惠券: 當用戶搶購時,就會生成訂單,並保存到tb_voucher_order這張表中。訂單表如果使用資料庫的自增id就存在一些問題: id的規律性太明顯:用戶可以根據id猜測一些信息 ...
  • 這裡介紹一下如何在Zabbix 6下麵,使用預設自帶的模板MSSQL by ODBC來監控SQL Server資料庫。官方關於Template DB MSSQL By ODBC的介紹如下鏈接所示: https://www.zabbix.com/integrations/mssql 這個項目對應的gi ...
  • 1、通過慢查日誌等定位那些執行效率較低的SQL語句 2、explain 分析SQL的執行計劃 需要重點關註type、rows、filtered、extra。 type由上至下,效率越來越高 ALL 全表掃描 index 索引全掃描 range 索引範圍掃描,常用語<,<=,>=,between,in ...
  • 4月22日,2023首屆雲資料庫技術沙龍 MySQL x ClickHouse 專場,在杭州市海智中心成功舉辦。本次沙龍由玖章算術、菜根發展、良倉太炎共創聯合主辦。圍繞“技術進化,讓數據更智能”為主題,匯聚位元組跳動、阿裡雲、玖章算術、華為雲、騰訊雲、百度的6位資料庫領域專家,深入 MySQL x C... ...
  • 摘要:“銀行業數字化轉型實踐交流會”杭州站順利收官。 由華為與北京先進數通聯合主辦的“銀行業數字化轉型實踐交流會”杭州站順利收官,會議邀請了金融科技先鋒企業、機構和多位資深專家,一起深入交流銀行業數字化轉型業務場景的探索和實踐。其中,華為雲資料庫專家在現場分享了華為雲GaussDB的前沿技術和項目實 ...
  • 實驗一、Flume 組件安裝配置 1、下載和解壓 Flume 可 以 從 官 網 下 載 Flume 組 件 安 裝 包 , 下 載 地 址 如 下 URL 鏈 接 所 示 https://archive.apache.org/dist/flume/1.6.0/ [root@master ~]# l ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...