一、概述 DataX 是阿裡雲 DataWorks數據集成 的開源版本,在阿裡巴巴集團內被廣泛使用的離線數據同步工具/平臺。DataX 實現了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS) ...
目錄
一、概述
DataX 是阿裡雲 DataWorks數據集成 的開源版本,在阿裡巴巴集團內被廣泛使用的離線數據同步工具/平臺。DataX 實現了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各種異構數據源之間高效的數據同步功能。
Gitee:https://github.com/alibaba/DataX
GitHub地址:https://github.com/alibaba/DataX
文檔:https://github.com/alibaba/DataX/blob/master/introduction.md
DataX 是一個異構數據源離線同步工具,致力於實現包括關係型資料庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。
- 為瞭解決異構數據源同步問題,DataX將複雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。
- DataX在阿裡巴巴集團內被廣泛使用,承擔了所有大數據的離線同步業務,並已持續穩定運行了6年之久。目前每天完成同步8w多道作業,每日傳輸數據量超過300TB。
二、DataX3.0框架設計
DataX本身作為離線數據同步框架,採用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
- Reader:Reader為數據採集模塊,負責採集數據源的數據,將數據發送給Framework。
- Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
- Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩衝,流控,併發,數據轉換等核心技術問題。
三、DataX3.0架構
DataX 3.0 開源版本支持單機多線程模式完成同步作業運行,本小節按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關係。
1)核心模塊介紹
- DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
- DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於併發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
- 切分多個Task之後,DataX Job會調用Scheduler模塊,根據配置的併發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的併發運行完畢分配好的所有Task,預設單個任務組的併發數量為5。
- 每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動
Reader—>Channel—>Writer
的線程來完成任務同步工作。 - DataX作業運行起來之後, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,進程退出值非0
2)DataX調度流程
舉例來說,用戶提交了一個DataX作業,並且配置了20個併發,目的是將一個100張分表的mysql數據同步到odps(Open Data Processing Service:開發數據處理服務)
裡面。 DataX的調度決策思路是:
- DataXJob根據分庫分表切分成了100個Task。
- 根據20個併發,DataX計算共需要分配4個TaskGroup。
- 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個併發共計運行25個Task。
四、環境部署
1)下載
$ mkdir -p /opt/bigdata/hadoop/software/datax ; cd /opt/bigdata/hadoop/software/datax
$ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
$ tar -xf datax.tar.gz -C /opt/bigdata/hadoop/server/
2)設置環境變數
$ cd /opt/bigdata/hadoop/server/
$ vi /etc/profile
export DATAX_HOME=/opt/bigdata/hadoop/server/datax
export PATH=$DATAX_HOME/bin:$PATH
$ source /etc/profile
3)官方示例
從stream讀取數據並列印到控制台
- 【第一步】創建作業的配置文件(json格式)
可以通過命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
# 需要註意,這裡需要安裝python2,雖然官網說Pytho3也可以,其實datax.py裡面還是python2的語法
$ yum -y install python2
$ cd $DATAX_HOME/bin
$ python2 datax.py -r streamreader -w streamwriter
根據模板配置json如下:
$ cat > stream2stream.json<<EOF
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
EOF
【小技巧】vi json格式化:%!python -m json.tool
執行
$ python2 datax.py ./stream2stream.json
發現報錯了
【解決】
$ rm -fr /opt/bigdata/hadoop/server/datax/plugin/*/._*
再執行
五、實戰示例
DataX目前已經有了比較全面的插件體系,主流的RDBMS資料庫、NOSQL、大數據計算系統都已經接入。DataX目前支持數據如下圖,詳情請查看GitHub官方文檔:
1)MYSQL to HDFS
1、準備好庫表數據
$ mysql -uroot -p
密碼:123456
creta database datax;
CREATE TABLE IF NOT EXISTS `datax`.`person` (
`id` int(10) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` VARCHAR(32) COMMENT '用戶名',
`age` int(10) COMMENT '年齡',
PRIMARY KEY (`id`)
)ENGINE=INNODB DEFAULT CHARSET=utf8;
insert into person(name,age) values ('person001',18) ,('person002',19),('person003',20),('person004',21),('person005',22);
select * from datax.person;
2、配置json文件
$ cd $DATAX_HOME
$ mkdir test
$ cat > ./test/mysql2hdfs <<EOF
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": [
"select * from datax.person;"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-node1:3306/datax?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"path": "/tmp/datax/",
"fileName": "person",
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "INT"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
EOF
$ hadoop fs -mkdir /tmp/datax/
3、執行
$ cd $DATAX_HOME
$ python2 bin/datax.py test/mysql2hdfs
【溫馨提示】如果mysql連接不上,請更換對應版本的mysql驅動,
$DATA_HOME/plugin/reader/mysqlreader/libs/mysql-connector-java-*
4、驗證
打開HDFS web檢查
1)MYSQL to Hive
1、準備好hive庫表數據
$ beeline -u jdbc:hive2://hadoop-node1:11000 -n root
-- 創建庫
CREATE DATABASE datax
-- 創建表時指定庫,指定分隔符
CREATE TABLE IF NOT EXISTS datax.hive_person (
id INT COMMENT 'ID',
name STRING COMMENT '名字',
age INT COMMENT '年齡'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
2、配置json文件
【溫馨提示】其實這裡也是推送數據HDFS文件,只不過時推送到表目錄下。只需要將上面的json配置改一行就行了。完整配置如下:
$ cd $DATAX_HOME
$ mkdir test
$ cat > ./test/mysql2hive <<EOF
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": [
"select * from datax.person;"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-node1:3306/datax?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"path": "/user/hive/warehouse/datax.db/hive_person",
"fileName": "person",
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "INT"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
EOF
3、執行
$ python2 bin/datax.py test/mysql2hive
4、驗證
打開HDFS web頁面
登錄hive客戶端查看hive表數據
$ beeline -u jdbc:hive2://hadoop-node1:11000 -n root
$ select * from datax.hive_person;
3)HDFS to MYSQL
1、準備好HDFS文件數據
$ cd $DATAX_HOME
$ cat >./test/person2.txt<<EOF
1,p1,21
2,p2,22
3,p3,30
4,p4,35
5,p5,31
6,p6,33
EOF
# 將文件推送到HDFS上
$ hadoop fs -put ./test/person2.txt /tmp/datax/
2、準備好MySQL表
CREATE TABLE IF NOT EXISTS `datax`.`person2` (
`id` int(10) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` VARCHAR(32) COMMENT '用戶名',
`age` int(10) COMMENT '年齡',
PRIMARY KEY (`id`)
)ENGINE=INNODB DEFAULT CHARSET=utf8;
3、配置json文件
$ cat >./test/hdfs2mysql.json<<EOF
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/tmp/datax/person2.txt",
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "long"
}
],
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age"
],
"preSql": [
"delete from person2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop-node1:3306/datax?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true",
"table": [
"person2"
]
}
]
}
}
}
]
}
}
EOF
4、執行
$ python2 ./bin/datax.py ./test/hdfs2mysql.json
5、驗證
登錄mysql查看
$ mysql -uroot -p
密碼:123456
select * from datax.person2;
六、DataX-WEB 安裝部署
GitHub地址:https://github.com/WeiYe-Jing/datax-web
1)下載
下載地址:
https://pan.baidu.com/share/init?surl=3yoqhGpD00I82K4lOYtQhg
提取碼:cpsk
2)解壓
$ cd /opt/bigdata/hadoop/software
$ tar -xf datax-web-2.1.2.tar.gz -C /opt/bigdata/hadoop/server/
3)配置環境變數
$ cd /opt/bigdata/hadoop/server/datax-web-2.1.2
$ vi /etc/profile
export DATAXWEB_HOME=/opt/bigdata/hadoop/server/datax-web-2.1.2
export PATH=$DATAXWEB_HOME/bin:$PATH
$ source /etc/profile
4)創建dataxweb資料庫
$ mysql -uroot -p -hhadoop-node1
密碼:123456
create database dataxweb;
5)執行一鍵安裝腳本
$ cd $DATAXWEB_HOME
$ ./bin/install.sh
6)修改配置
1、修改datax-admin配置
$ cd $DATAXWEB_HOME
# 修改資料庫配置,如果上面配置了,就可以跳過
$ vi ./modules/datax-admin/conf/bootstrap.properties
# 配置環境變數
$ vi ./modules/datax-admin/bin/env.properties
# web埠
SERVER_PORT=18088
# 創建 mybatis-plus列印sql日誌預設目錄,預設路徑:$ $DATAXWEB_HOME/modules/datax-admin/data/applogs/admin,要修改就這個配置文件:$DATAXWEB_HOME/modules/datax-admin/conf/application.yml
$ mkdir -p $DATAXWEB_HOME/modules/datax-admin/data/applogs/admin
2、修改datax-executor配置
$ cd $DATAXWEB_HOME
# 修改資料庫配置,如果上面配置了,就可以跳過
$ vi ./modules/datax-executor/conf/bootstrap.properties
# 配置環境變數
$ vi ./modules/datax-executor/bin/env.properties
# 主要修改配置如下:
## PYTHON腳本執行位置
PYTHON_PATH=/opt/bigdata/hadoop/server/datax/bin/datax.py
## 保持和datax-admin埠一致,更datax-admin的SERVER_PORT對應
DATAX_ADMIN_PORT=18088
# 創建 日誌預設目錄,預設路徑:$DATAXWEB_HOME/modules/datax-executor/data/applogs/executor/jobhandler,要修改就這個配置文件:$DATAXWEB_HOME/modules/datax-executor/conf/application.yml
$ mkdir -p $DATAXWEB_HOME/modules/datax-executor/data/applogs/executor/jobhandler
7)啟動服務
$ cd $DATAXWEB_HOME
$ ./bin/start-all.sh
# 或者分模塊啟動
$ ./bin/start.sh -m datax-admin
$ ./bin/start.sh -m datax-executor
# 查看datax-admin啟動日誌
$DATAXWEB_HOME/modules/datax-admin/bin/console.out
# 查看datax-executor啟動日誌
$DATAXWEB_HOME/modules/datax-executor/bin/console.out
web訪問:http://hadoop-node1:18088/index.html
預設賬號/密碼:admin/123456
8)簡單使用
前期準備
1、新建項目
2、創建hive庫和表
$ beeline
create database dataxweb;
CREATE TABLE IF NOT EXISTS dataxweb.hive_person(
id INT COMMENT 'ID',
name STRING COMMENT '名字',
age INT COMMENT '年齡'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
3、創建dataxweb person表
CREATE TABLE `dataxweb`.`person` (
`id` int NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(32) DEFAULT NULL COMMENT '用戶名',
`age` int DEFAULT NULL COMMENT '年齡',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb3;
1、MYSQL to Hive
創建任務
json配置如下:
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": [
"select * from datax.person;"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-node1:3306/dataxweb?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"path": "/user/hive/warehouse/dataxweb.db/hive_person",
"fileName": "person",
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "INT"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
執行,也可以定時執行
查看日誌
2、Hive to MYSQL
創建任務
json配置如下:
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/dataxweb.db/hive_person/person__7c10087d_a834_4558_b830_26322bad724b",
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "long"
}
],
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age"
],
"preSql": [
"delete from dataxweb.person"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop-node1:3306/dataxweb?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true",
"table": [
"person"
]
}
]
}
}
}
]
}
}
執行,也可以定時執行
查看日誌
其實知道上面datax命令操作,web端操作就非常簡單了,這裡只是簡單的實現了兩個示例,其它的小伙伴也可以試試,也非常簡單
【溫馨提示】執行機必須要有python環境變數哦!!!
七、DataX和Sqoop的比較
關於Sqoop,可以參考我之前的文章:大數據Hadoop之——數據轉換工具Sqoop
1)Sqoop主要特點
- 可以將關係型資料庫中的數據導入hdfs、hive或者hbase等hadoop組件中,也可將hadoop組件中的數據導入到關係型資料庫中;
- Sqoop在導入導出數據時,充分採用了map-reduce計算框架,根據輸入條件生成一個map-reduce作業,在hadoop集群中運行;
- 採用map-reduce框架同時在多個節點進行import或者export操作,速度比單節點運行多個並行導入導出效率高,同時提供了良好的併發性和容錯性;
- 支持insert、update模式,可以選擇參數,若內容存在就更新,若不存在就插入;
- 對國外的主流關係型資料庫支持性更好。
2)DataX主要特點
- 異構資料庫和文件系統之間的數據交換;
- 採用Framework + plugin架構構建,Framework處理了緩衝,流控,併發,上下文載入等高速數據交換的大部分技術問題,提供了簡單的介面與插件交互,插件僅需實現對數據處理系統的訪問;
- 數據傳輸過程在單進程內完成,全記憶體操作,不讀寫磁碟,也沒有IPC;
- 開放式的框架,開發者可以在極短的時間開發一個新插件以快速支持新的資料庫/文件系統。
3)Sqoop和DataX的區別
Sqoop
採用map-reduce計算框架進行導入導出,而datax僅僅在運行datax的單台機器上進行數據的抽取和載入,速度比Sqoop
慢了許多;Sqoop
只可以在關係型資料庫和hadoop組件之間進行數據遷移,而在hadoop相關組件之間,比如hive和hbase之間就無法使用Sqoop
互相導入導出數據,同時在關係型資料庫之間,比如mysql和oracle之間也無法通過sqoop導入導出數據;- 與之相反,
DataX
能夠分別實現關係型資料庫和hadoop組件之間、關係型資料庫之間、hadoop組件之間的數據遷移; Sqoop
是專門為hadoop而生,對hadoop支持度好,而DataX
可能會出現不支持高版本hadoop的現象;Sqoop
只支持官方提供的指定幾種關係型資料庫和hadoop組件之間的數據交換,而在DataX
中,用戶只需根據自身需求修改文件,生成相應rpm包,自行安裝之後就可以使用自己定製的插件;Sqoop
不支持ORC文件格式,而DataX
支持。
Sqoop和DataX各有優缺點,根據應用場景選擇,如有什麼疑問歡迎給我留言,後續會有更多關於大數據的文章。