目錄簡介工作流程核心架構核心模塊介紹DataX調度流程支持的數據實踐下載環境執行流程引用 簡介 DataX是一個數據同步工具,可以將數據從一個地方讀取出來並以極快的速度寫入另外一個地方。常見的如將mysql中的數據同步到另外一個mysql中,或者另外一個mongodb中。 工作流程 read:設置一 ...
目錄
簡介
DataX是一個數據同步工具,可以將數據從一個地方讀取出來並以極快的速度寫入另外一個地方。常見的如將mysql中的數據同步到另外一個mysql中,或者另外一個mongodb中。
工作流程
- read:設置一個源,DataX從源讀取數據
- write:設置一個目的地,DataX將讀取到的數據寫入目的地
- setting:同步設置,如設置併發通道、控製作業速度等
- Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩衝,流控,併發,數據轉換等核心技術問題
- 多線程:充分利用多線程來處理同步任務
核心架構
核心模塊介紹
1:DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
2:DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於併發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
3:切分多個Task之後,DataX Job會調用Scheduler模塊,根據配置的併發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的併發運行完畢分配好的所有Task,預設單個任務組的併發數量為5
4:每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作
5:DataX作業運行起來之後, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,進程退出值非0
DataX調度流程
舉例來說,用戶提交了一個DataX作業,並且配置了20個併發,目的是將一個100張分表的mysql數據同步到odps裡面。 DataX的調度決策思路是:
-
DaXJob根據分庫分表切分成了100個Task。
-
根據20個併發,DataX計算共需要分配4個TaskGroup。
-
4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個併發共計運行25個Task。
支持的數據
類型 | 數據源 | Reader(讀) | Writer(寫) | 文檔 |
---|---|---|---|---|
RDBMS 關係型資料庫 | MySQL | √ | √ | 讀 、寫 |
Oracle | √ | √ | 讀 、寫 | |
OceanBase | √ | √ | 讀 、寫 | |
SQLServer | √ | √ | 讀 、寫 | |
PostgreSQL | √ | √ | 讀 、寫 | |
DRDS | √ | √ | 讀 、寫 | |
達夢 | √ | √ | 讀 、寫 | |
通用RDBMS(支持所有關係型資料庫) | √ | √ | 讀 、寫 | |
阿裡雲數倉數據存儲 | ODPS | √ | √ | 讀 、寫 |
ADS | √ | 寫 | ||
OSS | √ | √ | 讀 、寫 | |
OCS | √ | √ | 讀 、寫 | |
NoSQL數據存儲 | OTS | √ | √ | 讀 、寫 |
Hbase0.94 | √ | √ | 讀 、寫 | |
Hbase1.1 | √ | √ | 讀 、寫 | |
MongoDB | √ | √ | 讀 、寫 | |
Hive | √ | √ | 讀 、寫 | |
無結構化數據存儲 | TxtFile | √ | √ | 讀 、寫 |
FTP | √ | √ | 讀 、寫 | |
HDFS | √ | √ | 讀 、寫 | |
Elasticsearch | √ | 寫 |
實踐
作為極簡教程,本文將從mysql中讀取一張表的數據,然後同步到clickhouse中。
下載
打開該項目的Github 首頁進行下載:https://github.com/alibaba/DataX
下載鏈接:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
下載下來是一個tar.gz的包,windows下解壓命令:
tar -zxvf xxx.tar.gz
程式目錄:
- bin:使用裡面的 datax.py 來啟動程式
- job:裡面放了一個job.json,用來檢查運行環境,一般的建議下載完畢之後執行一次。
- log:存放執行日誌
- plugin:插件集,插件分為read和write,分別對應datax可支持的資料庫
- 其他目錄:......
環境
DataX是基於python和java的,需要機器擁有python和java 的運行環境。
在下載完畢後,通過執行自檢腳本,可確認環境是否正確
python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
執行流程
編寫同步任務配置文件,在job目錄中創建 mysql-to-clickhouse.json 文件,並填入如下內容
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxx",
"password": "xxx",
"column": [
"id",
"name"
],
"splitPk": "id",
"connection": [
{
"table": [
"table_name"
],
"jdbcUrl": [
"jdbc:mysql://192.168.1.xxx:xxx/db_name"
]
}
]
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"username": "xxx",
"password": "xxx",
"column": [
"id",
"ame"
],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://192.168.1.xxx:xxx/table_name",
"table": [
"table_name"
]
}
],
"preSql": [],
"postSql": [],
"batchSize": 65536,
"batchByteSize": 134217728,
"dryRun": false,
"writeMode": "insert"
}
}
}
]
}
}
- job:一個job包含兩個部分,setting中設置任務的執行速度,錯誤限制等,content中是任務具體的描述。
- reader:任務的數據輸入源
- writer:任務的數據輸出源
根據任務配置文件啟動datax,先cd到datax的根目錄
python bin/datax.py job/mysql-to-clickhouse.json
運行上述命令後,任務就開啟了。本例從mysql資料庫中的一張表中讀取了兩個欄位(id,name),然後同步到clickhouse中,clickhouse中需要先創建同樣的庫,表和列。
任務執行非常快,140W數據僅用了 18s 就完成了同步。
2024-05-16 16:24:57.312 [job-0] INFO JobContainer -
任務啟動時刻 : 2024-05-16 16:24:38
任務結束時刻 : 2024-05-16 16:24:57
任務總計耗時 : 18s
任務平均流量 : 2.21MB/s
記錄寫入速度 : 142425rec/s
讀出記錄總數 : 1424252
讀寫失敗總數 : 0
引用
- readme:https://github.com/alibaba/DataX
- introduction:https://github.com/alibaba/DataX/blob/master/introduction.md