開源共建 | Dinky 擴展批流統一數據集成框架 ChunJun 的實踐分享

来源:https://www.cnblogs.com/DTinsight/archive/2022/11/18/16902447.html
-Advertisement-
Play Games

一、前言 ChunJun(原FlinkX)是一個基於 Flink 提供易用、穩定、高效的批流統一的數據集成工具,既可以採集靜態的數據,比如 MySQL,HDFS 等,也可以採集實時變化的數據,比如 binlog,Kafka等。同時 ChunJun 也是一個支持原生 FlinkSql所有語法和特性的計 ...


一、前言

ChunJun(原FlinkX)是一個基於 Flink 提供易用、穩定、高效的批流統一的數據集成工具,既可以採集靜態的數據,比如 MySQL,HDFS 等,也可以採集實時變化的數據,比如 binlog,Kafka等。同時 ChunJun 也是一個支持原生 FlinkSql所有語法和特性的計算框架。

ChunJun 具有豐富的插件種類,多達40種,如常見的 mysql、binlog、logminer 等,大部分插件都支持 source/reader、sink/writer 及維表功能。目前很多用戶在思考能否在 Dinky 上使用 ChunJun 的插件以提供更全面的能力。那本文將帶來如何在 Dinky 上集成 ChunJun 豐富的插件,其實簡單,那我們開始吧。

二、部署 Flink+ChunJun

編譯

註意,如果需要集成 Dinky,需要將 ChunJun項目下的 chunjun-core 的pom 文件中的 logback-classic 和 logback-core 註釋掉,否則容易在 Dinky 執行 sql 任務的時候報錯。

file
然後執行:

file

部署

使用 ChunJun 需要先部署 Flink 集群,其部署本文不再做指導。

值得註意的是,如果你需要調用 Flinkx 的 connect jar 的話,則需要將 classloader.resolve-order 改成 parent-first。修改完成配置以後,把 Flinkx 的 jar 包複製過來,主要是 chunjun-clients-master.jar(Flinkx 現在改名 ChunJun )以及 chunjun 的其它 connector 放到 flink/lib 目錄下,如圖所示。
file

異常處理

如果啟動集群時出現異常,即 Flink standalone 集群載入 flinkx-dist 里 jar 包之後,集群無法啟動,日誌報錯:Exception in thread "main" java.lang.NoSuchFieldError: EMPTY_BYTE_ARRAY.

Exception in thread"main"java.lang.NoSuchFieldError:EMPTY_BYTE_ARRAY
at org.apache.logging.log4j.core.config.ConfigurationSource.(ConfigurationSource.java:56)
at org.apache.logging.log4j.core.config.NullConfiguration.< init>(NullConfiguration.java:32)
at org.apache.logging.log4j.core.LoggerContext.< clinit>(LoggerContext.java:85)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.log4j.LogManager.< clinit>(LogManager.java:72)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.< clinit>(ClusterEntrypoint.java:107)

原因:這個報錯是因為 log4j 版本不統一導致的,因為 flinkx-dist 中部分插件引用的還是舊版本的 log4j 依賴,導致集群啟動過程中,出現了類衝突問題;

方案:臨時方案是將 flink lib 中 log4j 相關的jar包名字前加上字元 ‘a‘,使得flink standalone jvm 優先載入。
file

三、部署 Dinky

編譯

file
編譯完成後的壓縮包在 Dinky 根目錄下的 build 文件夾下。

部署

1、上傳dlink壓縮包到部署伺服器

2、解壓

file
3、資料庫初始化

4、把 flink 的 jar 放到 dlink 目錄下

file

因為目前 flinkx 的穩定版本是 1.12.7,所以我們把 dlink 預設的 client 版本修改為 1.12

file
lib下的目錄如圖:

file
註意:因為我沒有用上 dlink-connector-jdbc 的 jar 包,所以圖中的 dlink-connector-jdbc-1.13-0.6.4-SNAPSHOT.jar 沒有換成1.12版本的,可以去掉。

啟動

啟動命令
file

註冊集群實例

在集群實例中註冊已經啟動的 Flink 集群。
file

四、示例分享

添加依賴

這裡演示 mysql->mysql 的同步作業,所以需要 Flinkx 的 mysql-connector.jar 以及核心 jar。
file

編寫作業

Mysql DDL:

CREATE TABLE datasource_classify (
id int unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
classify_code varchar(64) NOT NULL COMMENT '類型欄唯一編碼',
sorted int NOT NULL DEFAULT '0' COMMENT '類型欄排序欄位 預設從0開始',
classify_name varchar(64) NOT NULL COMMENT '類型名稱 包含全部和常用欄',
is_deleted tinyint NOT NULL DEFAULT '0' COMMENT '是否刪除,1刪除,0未刪除',
gmt_create datetime DEFAULT CURRENT_TIMESTAMP,
gmt_modified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY classify_code (classify_code)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='數據源分類表';

CREATE TABLE source
(
id bigint,
classify_code STRING,
sorted int,
classify_name STRING,
is_deleted int,
gmt_create timestamp(9),
gmt_modified timestamp(9),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.31.101:3306/datasource?useSSL=false',
'table-name' = 'datasource_classify',
'username' = 'root',
'password' = 'root'
,'scan.fetch-size' = '2'
,'scan.query-timeout' = '10'
);

CREATE TABLE sink
(
id bigint,
classify_code STRING,
sorted int,
classify_name STRING,
is_deleted int,
gmt_create timestamp(9),
gmt_modified timestamp(9),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false',
'table-name' = 'datasource_classify',
'username' = 'root',
'password' = 'root'
,'scan.fetch-size' = '2'
,'scan.query-timeout' = '10'
);

insert into sink
select *
from source u;

執行任務

file
選中 Yarn Session 模式提交作業。
file
提交後可從執行歷史查看作業提交狀況。
file
進程中可以看的 Flink 集群上批作業執行完成。

對比數據

源庫:
file
目標庫:
file
同步成功,很絲滑。

五、總結

在集成 ChunJun 的時候遇到的問題大部分都是缺包以及包衝突,所以只需要註意一下這個問題就能比較好的進行集成。

在集成服務的時候建議是,先把 Flink 和 ChunJun 進行集成,確保服務能夠正常啟用以後再進行 Dinky 的集成,這樣有利於快速定位查找問題,如果遇到文章之外的問題,也可以查看 Dinky 官網FAQ | Dinky (dlink.top) chunjun的官網QuickStart | ChunJun 純鈞 (dtstack.github.io/chunjun/),看看是否有類似問題的解決辦法作為參考。

六、用戶體驗

因為本人目前還是處於學習使用的過程中,所以很多功能沒有好好使用,待自己研究更加透徹後希望寫一篇文章,優化官網的用戶手冊。以下的優缺點以及建議都是目前我在使用學習的過程中遇到的問題。

優點

Dinky 最吸引我的地方應該就是 sql 編輯模版了,直接快捷鍵生成 sql 模版,在開發測試中屢試不爽。在集成了 ChunJun(Flinkx) 以後,能夠做到多源數據的離線跑批任務及日常小批量實時任務的同步。支持各種類型的任務執行方式。

缺點

ui 上適配還有點小問題,例如:打開 F12 調整寬度後,再關閉,頁面 ui 不會自適應,需要刷新。

期待改進點

1、更多的自定義異常、業務異常

2、增加新的嚮導模式,結合數據源,通過 webUI 可以一鍵引入欄位或者勾選需要的欄位,生成 Flink Sql 的一大部分配置

CREATE TABLE 表名
(
-- 頁面勾選欄位,欄位從元數據直接拉取
id bigint,
classify_code STRING,
sorted int,
classify_name STRING,
is_deleted int,
gmt_create timestamp(9),
gmt_modified timestamp(9),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
-- 從選擇的數據中獲取
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false',
'table-name' = 'datasource_classify',
'username' = 'root',
'password' = 'root'
,
-- 其它非主要配置有用戶自己填寫
);
3、sql 歷史版本管理,目前我已經提交 Feature 並被合併到 0.6.5 版本中。

想瞭解或咨詢更多有關袋鼠雲大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky

同時,歡迎對大數據開源項目有興趣的同學加入「袋鼠雲開源框架釘釘技術qun」,交流最新開源技術信息,qun號碼:30537511,項目地址:https://github.com/DTStack/Taier


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、關鍵字和保留字 1.什麼是關鍵字? 通俗來說就是帶有特殊含義的字元,有自己專門用途的單詞 2.特點? 關鍵字全部由小寫構成,以下是java官方列舉出的關鍵字 註意: 保留關鍵字:指的是現有版本中尚未使用,後續可能會作為關鍵字使用,自己命名標識符要避免使用這些關鍵字,並且Java 的 null 不 ...
  • 前提:最近公司下發通知,所有開發人員 必須 卸載 Navicat 資料庫可視化工具,不知道兄弟們有沒有在使用的,可能現在的反應跟我一樣,一臉懵逼,Navicat為什麼不能使用呢? 有事沒事找度娘,於是我。。 (簡單粗暴) 搜到了答案。。 原來如此,那就廢話不多說,這邊介紹 使用 DBeaver 來替 ...
  • 作用:隨時修改代碼 (在函數或類定義完成之後,再去修改函數的實現過程) """類似猴子補丁在函數定義好之後,再去更改他的行為"""import typesclass Valley: def func(self): return "等待宣告"def common(self): return "只有永不 ...
  • 1. 擴容方案剖析 1.1 擴容問題 在項目初期,我們部署了三個資料庫A、B、C,此時資料庫的規模可以滿足我們的業務需求。為了將數據做到平均分配,我們在Service服務層使用uid%3進行取模分片,從而將數據平均分配到三個資料庫中。 如圖所示: 後期隨著用戶量的增加,用戶產生的數據信息被源源不斷的 ...
  • 上文介紹了命令行方式來對文件進行加解密操作。本文將繼續在此基礎上,實現一個快速簡易的GUI界面方便操作,先上代碼看效果。 ...
  • aspnetcore上傳圖片也就是上傳文件有兩種方式,一種是通過form-data,一種是binary。 先介紹第一種form-data: 該方式需要顯示指定一個IFormFile類型,該組件會動態通過打開一個windows視窗選擇文件 及圖片。 postman演示如上,代碼如下: [HttpPos ...
  • FreeRtos操作系統 首先,應該介紹什麼是FreeRtos,他於單片機而言就是一個管理器,作為管理者管理嵌入式晶元中的任務,堆棧,中斷,隊列等等資源,對於操作系統而言,又分為實時操作系統和非實時操作系統,實時操作系統代表任務或者某個功能必須在指定的運行時間內完成,保證設備想要執行的功能能立即得到 ...
  • 摘要:先通過OPS確認節點狀態是否已經恢復,或登錄後臺執行cm_ctl query -Cv確認集群是否已經Normal。 本文分享自華為雲社區《【實例狀態】GaussDB CN服務異常》,作者:酷哥。 確認節點狀態 先通過OPS確認節點狀態是否已經恢復,或登錄後臺執行cm_ctl query -Cv ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...