技術乾貨|如何利用 ChunJun 實現數據離線同步?

来源:https://www.cnblogs.com/DTinsight/archive/2023/05/19/17414282.html
-Advertisement-
Play Games

ChunJun 是⼀款穩定、易⽤、⾼效、批流⼀體的數據集成框架,基於計算引擎 Flink 實現多種異構數據源之間的數據同步與計算。ChunJun 可以把不同來源、格式、特點性質的數據在邏輯上或物理上有機地集中,從⽽為企業提供全⾯的數據共用,目前已在上千家公司部署且穩定運⾏。 在之前,我們曾經為大家介 ...


ChunJun 是⼀款穩定、易⽤、⾼效、批流⼀體的數據集成框架,基於計算引擎 Flink 實現多種異構數據源之間的數據同步與計算。ChunJun 可以把不同來源、格式、特點性質的數據在邏輯上或物理上有機地集中,從⽽為企業提供全⾯的數據共用,目前已在上千家公司部署且穩定運⾏。

在之前,我們曾經為大家介紹過如何利用 ChunJun 實現數據實時同步(點擊看正文),本篇將為大家介紹姊妹篇,如何利⽤ ChunJun 實現數據的離線同步。

ChunJun 離線同步案例

離線同步是 ChunJun 的⼀個重要特性,下⾯以最通⽤的 mysql -> hive 的同步任務來介紹離線同步。

配置環境

找⼀個空⽬錄,接下來要配置 Flink 和 ChunJun 的環境,下⾯以 /root/chunjun_demo/ 為例⼦。

● 配置 Flink

下載 Flink

wget "http://archive.apache.org/dist/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz"
tar -zxvf chunjun-dist.tar.gz

● 配置 ChunJun

#下載 chunjun, 內部依賴 flink 1.12.7
wget https://github.com/DTStack/chunjun/releases/download/v1.12.8/chunjun-dist-1.12-SNAPSHOT.tar.gz
#新創建⼀個⽬錄
mkdir chunjun && cd chunjun
#解壓到指定⽬錄
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz

解壓好的 ChunJun 有如下⽬錄:
bin
chunjun-dist
chunjun-examples
lib

● 配置環境變數

#配置 Flink 環境變數
echo "FLINK_HOME=/root/chunjun_demo/flink-1.12.7" >> /etc/profile.d/sh.local
#配置 Chunjun 的環境變數
echo "CHUNJUN_DIST=/root/chunjun_demo/chunjun/chunjun-dist" >> /etc/profile.d/sh.local
#刷新換新變數
. /etc/profile.d/sh.local

● 在 Yarn 上⾯啟動 Flink Session

#啟動 Flink Session
bash $FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_DIST -d

輸出如下:

echo "stop" | $FLINK_HOME/bin/yarn-session.sh -id application_1683599622970_0270
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
yarn application -kill application_1683599622970_0270

下⾯提交任務會⽤到 Flink Session 這個 Yarn Application Id (application_1683599622970_0270)。

● 其他配置

如果⽤ parquet 格式,需要把 flink-parquet_2.12-1.12.7.jar 放⼊到 flink/lib 下⾯, 在上⾯的例⼦中,需要放到 $FLINK_HOME/lib ⾥⾯。

file

提交任務

● 在 MySQL 準備數據

-- 創建⼀個名為ecommerce_db的資料庫,⽤於存儲電商⽹站的數據
CREATE DATABASE IF NOT EXISTS chunjun;
USE chunjun;
-- 創建⼀個名為orders的表,⽤於存儲訂單信息
CREATE TABLE IF NOT EXISTS orders (
 id INT AUTO_INCREMENT PRIMARY KEY, -- ⾃增主鍵
 order_id VARCHAR(50) NOT NULL, -- 訂單編號,不能為空
 user_id INT NOT NULL, -- ⽤戶ID,不能為空
 product_id INT NOT NULL, -- 產品ID,不能為空
 quantity INT NOT NULL, -- 訂購數量,不能為空
 order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
-- 訂單⽇期,預設值為當前時間戳,不能為空
);
-- 插⼊⼀些測試數據到orders表
INSERT INTO orders (order_id, user_id, product_id, quantity)
VALUES ('ORD123', 1, 101, 2),
       ('ORD124', 2, 102, 1),
       ('ORD125', 3, 103, 3),  
       ('ORD126', 1, 104, 1),
       ('ORD127', 2, 105, 5);
       
select * from chunjun.orders;       

如果沒有 MySQL 的話,可以⽤ docker 快速創建⼀個。

docker pull mysql:8.0.12
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.12

● 創建 Hive 表

CREATE DATABASE IF NOT EXISTS chunjun;
USE chunjun;
-- 創建⼀個名為orders的表,⽤於存儲訂單信息
CREATE TABLE IF NOT EXISTS chunjun.orders (
 id INT,
 order_id VARCHAR(50),
 user_id INT,
 product_id INT,
 quantity INT,
 order_date TIMESTAMP
)
 STORED AS PARQUET;
-- 查看 hive 表,底層的 HDFS ⽂件位置,下⾯的 SQL 結果⾥⾯ Location 欄位,就是 HDFS ⽂件的位置。
desc formatted chunjun.orders;
-- Location: hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders
-- ⼀會配置同步任務的時候會⽤到 hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders

● 在當前⽬錄( /root/chunjun_demo/ ) 配置⼀個任務 mysql_hdfs.json

vim mysql_hdfs.json 輸⼊如下內容:

{
"job": {
"content": [
 {
"reader": {
"parameter": {
"connection": [
 {
"schema": "chunjun",
"jdbcUrl": [ "jdbc:mysql://172.16.85.200:3306/chunjun" ],
"table": [ "orders" ]
 }
 ],
"username": "root",
"password": "123456",
"column": [
 { "name": "id", "type": "INT" },
 { "name": "order_id", "type": "VARCHAR" },
 { "name": "user_id", "type": "INT" },
 { "name": "product_id", "type": "INT" },
 { "name": "quantity", "type": "INT" },
 { "name": "order_date", "type": "TIMESTAMP" }
 ]
 },
"name": "mysqlreader"
 },
"writer": {
"parameter": {
"path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders",
"defaultFS": "hdfs://ns1",
"hadoopConfig": {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000",
"dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000",
"dfs.client.failover.proxy.provider.ns1":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 },
"column": [
 { "name": "id", "type": "INT" },
 { "name": "order_id", "type": "VARCHAR" },
 { "name": "user_id", "type": "INT" },
 { "name": "product_id", "type": "INT" },
 { "name": "quantity", "type": "INT" },
 { "name": "order_date", "type": "TIMESTAMP" }
 ],
"writeMode": "overwrite",
"encoding": "utf-8",
"fileType": "parquet",
"fullColumnName":
 [ "id", "order_id", "user_id", "product_id", "quantity", "order_date"],
"fullColumnType":
 [ "INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP" ]
 },
"name": "hdfswriter"
 }
 }
 ],
"setting": {
"errorLimit": {
"record": 0
 },
"speed": {
"bytes": 0,
"channel": 1
 }
 }
 }
}

因為我們要將 MySQL 同步到 Hive ⾥⾯,但是如果直接同步 Hive 的話,內部會⽤ jdbc,⽽ jdbc 的效率不⾼,因此我們可以直接把數據同步到 Hive 底層的 HDFS 上⾯,所以 writer ⽤到了 hdfswriter。腳本解析如下:

{
"job": {
"content": [
 {
"reader": {
"parameter": {
"connectionComment": "資料庫鏈接, 資料庫, 表, 賬號, 密碼",
"connection": [
 {
"schema": "chunjun",
"jdbcUrl": [ "jdbc:mysql://172.16.85.200:3306/chunjun" ],
"table": [ "orders" ]
 }
 ],
"username": "root",
"password": "123456",
"columnComment": "要同步的列選擇, 可以選擇部分列",
"column": [
 { "name": "id", "type": "INT" },
 { "name": "order_id", "type": "VARCHAR" },
 { "name": "user_id", "type": "INT" },
 { "name": "product_id", "type": "INT" },
 { "name": "quantity", "type": "INT" },
 { "name": "order_date", "type": "TIMESTAMP" }
 ]
 },
"nameComment" : "source 是 mysql",
"name": "mysqlreader"
 },
"writer": {
"parameter": {
"pathComment": "HDFS 上⾯的路徑, 通過 hive 語句的 desc formatted 查看",
"path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders",
"defaultFS": "hdfs://ns1",
"hadoopConfigComment": "是 hdfs ⾼可⽤最基本的配置, 在 Hadoop 配置⽂件 hdfs-site.xml 可以找到",
"hadoopConfig": {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000",
"dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000",
"dfs.client.failover.proxy.provider.ns1":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 },
"columnComment": "要同步的列選擇, 可以選擇部分列",
"column": [
 { "name": "id", "type": "INT" },
 { "name": "order_id", "type": "VARCHAR" },
 { "name": "user_id", "type": "INT" },
 { "name": "product_id", "type": "INT" },
 { "name": "quantity", "type": "INT" },
 { "name": "order_date", "type": "TIMESTAMP" }
 ],
"writeModeComment": "覆蓋寫⼊到 hdfs 上⾯的⽂件, 可選 overwrite, append(預設模式)",
"writeMode": "overwrite",
"encoding": "utf-8",
"fileTypeComment": "可選 orc, parquet, text",
"fileType": "parquet",
"fullColumnNameComment": "全部欄位,有時候 column ⾥⾯同步部分欄位,但是⼜需要有全部欄位的格式,例如 fileType : text ",
"fullColumnName": [ "id", "order_id", "user_id", "product_id", "quantity", "order_date"], 
"fullColumnTypeComment": "全部欄位的類型",
"fullColumnType": [ "INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP" ]
 },
"nameComment" : "sink 是 hdfs",
"name": "hdfswriter"
 }
 }
 ],
"setting": {
"errorLimit": {
"record": 0
 },
"speed": {
"bytes": 0,
"channel": 1
 }
 }
 }
}

● 提交任務

bash chunjun/bin/chunjun-yarn-session.sh -job mysql_hdfs.json -confProp
{\"yarn.application.id\":\"application_1683599622970_0270\"}

● 查看任務

file
file

任務同步完成, 可以看⼀下 HDFS 上⾯的數據。

file

查看⼀下 Hive 表的數據。

file

註意, 如果是分區的 Hive 表,需要⼿動刷新⼀下 Hive 的元數據, 使⽤ MSCK 命令。(MSCK 是 Hive 中的⼀個命令,⽤於檢查表中的分區,並將其添加到 Hive 元數據中)

MSCK REPAIR TABLE my_table;

ChunJun 離線同步原理解析

HDFS 文件同步原理

· 對於⽂件系統,同步的時候會先把⽂件寫⼊到 path + [filename] ⽬錄⾥⾯的 .data 的⽂件⾥⾯,如果任務失敗,那麼 .data ⾥⾯的⽂件不會⽣效。

· 在 TaskManager 上⾯所有 task 任務結束的時候,會在 JobManager 執⾏ FinalizeOnMaster 的 finalizeGlobal ⽅法, 最終會調⽤到 moveAllTmpDataFileToDir , 把 .data ⾥⾯的⽂件移除到 .data 的上⼀層。

public interface FinalizeOnMaster {

/**
The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished.
Params:parallelism – The parallelism with which the format or functions was run.
Throws:IOException – The finalization may throw exceptions, which may cause the job to abort.
*/
void finalizeGlobal(int parallelism) throws IOException; 
}
// 在 JobManager 執⾏
@Override
protected void moveAllTmpDataFileToDir() {
if (fs == null) {
openSource();
 }
String currentFilePath = "";
try {
Path dir = new Path(outputFilePath);
Path tmpDir = new Path(tmpPath);

FileStatus[] dataFiles = fs.listStatus(tmpDir);
for (FileStatus dataFile : dataFiles) {
currentFilePath = dataFile.getPath().getName();
fs.rename(dataFile.getPath(), dir);
LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir);
 }
fs.delete(tmpDir, true);
 } catch (IOException e) {
throw new ChunJunRuntimeException(
String.format(
"can't move file:[%s] to dir:[%s]", currentFilePath, outputFilePath),
e);
 }
}

增量同步

增量同步主要針對某些只有 Insert 操作的表,隨著業務增⻓,表內數據越來越多。如果每次都同步整表的話,消耗的時間和資源會⽐較多。因此需要⼀個增量同步的功能,每次只讀取增加部分的數據。

● 實現原理

其實現原理實際上就是配合增量鍵在查詢的 sql 語句中拼接過濾條件,⽐如 where id > ? ,將之前已經讀取過的數據過濾出去。

增量同步是針對於兩個及以上的同步作業來說的。對於初次執⾏增量同步的作業⽽⾔,實際上是整表同步,不同於其他作業的在於增量同步作業會在作業執⾏完成後記錄⼀個 endLocation 指標,並將這個指標上傳到 prometheus 以供後續使⽤。

除第⼀次作業外,後續的所有增量同步作業都會取上⼀次作業的 endLocation 做為本次作業的過濾依據(startLocation)。⽐如第⼀次作業執⾏完後,endLocation 為10,那麼下⼀個作業就會構建出例如 SELECT id,name,age from table where id > 10 的 SQL 語句,達到增量讀取的⽬的。

● 使用限制

· 只有 RDB 的 Reader 插件可以使⽤

· 通過構建SQL過濾語句實現,因此只能⽤於RDB插件

· 增量同步只關⼼讀,不關⼼寫,因此只與Reader插件有關

· 增量欄位只能為數值類型和時間類型

· 指標需要上傳到 prometheus,⽽ prometheus 不⽀持字元串類型,因此只⽀持數據類型和時間類型,時間類型會轉換成時間戳後上傳

· 增量鍵的值可以重覆,但必須遞增

· 由於使⽤ '>' 的緣故,要求欄位必須遞增

斷點續傳

斷點續傳是為了在離線同步的時候,針對⻓時間同步任務如超過1天,如果在同步過程中由於某些原因導致任務失敗,從頭再來的話成本⾮常⼤,因此需要⼀個斷點續傳的功能從任務失敗的地⽅繼續。

● 實現原理

· 基於 Flink 的 checkpoint,在 checkpoint 的時候 會存儲 source 端最後⼀條數據的某個欄位值,sink 端插件執⾏事務提交。

· 在任務失敗,後續通過 checkpoint 重新運⾏時,source 端在⽣成 select 語句的時候將 state ⾥的值作為條件拼接進⾏數據的過濾,達到從上次失敗位點進⾏恢復。

file
· jdbcInputFormat 在拼接讀取 SQL 時,如果從 checkpoint 恢復的 state 不為空且 restoreColumn 不為空,則此時會將 checkpoint ⾥的 state 作為起點開始讀取數據。

● 適用場景

通過上述原理我們可以知道 source 端必須是 RDB 類型插件,因為是通過 select 語句拼接 where 條件進⾏數據過濾達到斷點續傳的,同時斷點續傳需要指定⼀個欄位作為過濾條件,且此欄位要求是遞增的。

· 任務需要開啟 checkpoint

· reader 為 RDB 的插件均⽀持且 writer ⽀持事務的插件(如 rdb filesystem 等),如果下游是冪等性則 writer 插件也不需要⽀持事務

· 作為斷點續傳的欄位在源表⾥的數據是遞增的,因為過濾條件是 >

《數棧產品白皮書》:https://www.dtstack.com/resources/1004?src=szsm

《數據治理行業實踐白皮書》下載地址:https://www.dtstack.com/resources/1001?src=szsm

想瞭解或咨詢更多有關袋鼠雲大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky

同時,歡迎對大數據開源項目有興趣的同學加入「袋鼠雲開源框架釘釘技術qun」,交流最新開源技術信息,qun號碼:30537511,項目地址:https://github.com/DTStack


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 哈嘍大家好,我是鹹魚 隨著互聯網技術的發展,分散式架構越來越被人們所採用。在分散式架構下,**為了實現複雜的業務邏輯,應用程式需要分散式通信實現遠程調用** 而這時候就需要一種協議來支持遠程過程調用,以便實現不同應用程式之間的數據交換和信息傳遞。其中常用的協議包括 HTTP 協議和 RPC 協議 H ...
  • > 本文時間 2023-05-19 > 作者:sugerqube漆瓷 本文重理解,!!忽略環境變數載入原理!! **本文目標**:理解六大環境變數配置,選擇合適的配置文件進行配置 # 配置環境的理由 以**shell**編程為例 ![image](https://img2023.cnblogs.co ...
  • 目錄 一、查看網卡 二、創建虛擬網卡 三、修改主機名 四、管理路由表 五、管理埠 六、網路通信 七、配置網卡 一、查看網卡 命令:ifconfig :查看當前可用設備 主要查看ip地址和mac地址 ifconfig +設備名:表示查看指定設備狀態 ifconfig選項:-a表示查看所有設備(包含沒 ...
  • # 1. 介紹 事情是這樣的,UAT 環境的測試小伙伴向我扔來一個小 bug,說是一個放大鏡的查詢很慢,轉幾分鐘才出數據,我立馬上開發環境試了一下,很快啊我說😏,放大鏡的數據立馬就出來了,然後我登錄 UAT 環境一看,誒是有些慢😕 ,於是開始了我的排查之旅... # 2. 過程 首先我立馬拿到了 ...
  • 資料庫(Database)中的預設欄位(也稱為預設欄位),就是在一般情況下,每個數據表(Table)必須包含的欄位(Field),這類欄位用於滿足特定的數據需求,欄位值的填充或更改一般遵照一定的邏輯要求。預設欄位的設計應該考慮到數據的完整性和一致性,以確保數據的正確與可靠,設計合理的表欄位對於數據的 ...
  • > 本文首發於公眾號:Hunter後端 > 原文鏈接:[es筆記二之基礎查詢](https://mp.weixin.qq.com/s/VW0QCuW-ONEH-TRB2WF4GQ) 這一篇筆記介紹 es 的基礎查詢。 基礎查詢包括很多,比如排序,類似資料庫 limit 的操作,like 操作,與或非 ...
  • **本系列為:MySQL資料庫詳解,為千鋒教育資深教學老師獨家創作** **致力於為大家講解清晰MySQL資料庫相關知識點,含有豐富的代碼案例及講解。如果感覺對大家有幫助的話,可以【關註】持續追更\~** **文末有本文重點總結,技術類問題,也歡迎大家和我們溝通交流!** ![在這裡插入圖片描述]( ...
  • 摘要:有一種數據泄露的死敵,叫全密態! 本文分享自華為雲社區《這年頭怕數據泄露?全密態資料庫:無所謂,我會出手》,作者:GaussDB 資料庫。 弔炸天的全密態資料庫,到底是個啥? 藏不住了,這全密態資料庫真上頭! 有一種數據泄露的死敵,叫全密態! 數據被標價售賣 莫名其妙接到詐騙電話 企業數據泄露 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...