![file](https://img2023.cnblogs.com/other/2685289/202306/2685289-20230629181452343-550852462.png) 作者|雲科NearFar X Lab團隊 左益、周志銀、洪守偉、陳超、武超 ## 一、導讀 無錫拈花雲科 ...
作者|雲科NearFar X Lab團隊 左益、周志銀、洪守偉、陳超、武超
一、導讀
無錫拈花雲科技服務有限公司(以下簡稱:拈花雲科)是由拈花灣文旅和北京滴普科技共同孵化的文旅目的地數智化服務商。2022年底,拈花雲科NearFar X Lab團隊開始測試DolphinScheduler作為交付型項目和產品項目的任務調度工具。本文主要分享了拈花雲科在任務調度工具的選擇、迭代和實踐過程中的經驗,希望對大家有所啟發。
二、業務背景
我們的服務對象主要是國內各個景區、景點,業務範圍涵蓋文旅行業的多個板塊,如票務、交通、零售、住宿、餐飲、演繹、游樂、影院、KTV、租賃、服務、會務、康樂、康養、電商、客服、營銷、分銷、安防等。由於業務系統來源較多,多系統下的數據源類型差異化較大,所以在實施數據項目時我們需要能夠支持多種數據來源(Mysql、Oracle、SqlServer、Hive、Excel……)的數據集成任務。同時根據大部分景區為國有化的特點,我們也需要具備能夠提供私有化交付部署及SAAS化數據中台產品解決方案的雙重服務支撐能力。
三、DolphinScheduler 調度系統選型過程
在團隊成立之初為了快速構建MVP業務版本,我們沿用了團隊同事之前用過的Kettle調度方案。該方案下通過Kettle完成可視化調度的配置及對於異構數據的集成任務,通過Python 調用HQL腳本完成基於Hive的傳參數據計算。
基於MVP的構建,我們也開始思考,在我們的整體中台架構下該需要一個什麼樣的調度系統,以及除了調度這件事本身我們還需要哪些功能和能力。帶著這些問題我們開始整理自己的需求,以及每個需求下有什麼樣的產品可以適配。
調度系統需要支撐的應用場景
文旅業態下的數據使用場景與其它業態下的使用場景大體相同,主要分為以下四類:
調度系統需要支撐的項目類型
我們選擇的調度系統需要同時具備實施類項目、SAAS產品兩種需求下的數據中台支撐能力
基於以上需求我們進行了調度系統的選型對比。網上有非常多關於Oozie、Azkaban、Airflow、DolphinScheduler、Xxl-job、Kettle等調度選型的文章及介紹,在此不過多的展開他們的優缺點。我們覺得每個產品的設計都有它自身的考量,都有適用與不適用的場景。結合我們自身的使用需求最終我們選擇了使用DolphinScheduler作為數據中台的調度平臺。
主要原因如下:
- High Reliability(高可靠性)
高可靠性是我們看重的第一要點,因為不管是實施項目還是SAAS產品,只有系統穩定產品才可以正常運行。DolphinScheduler通過去中心化設計、原生 HA 任務隊列支持、過載容錯能力支持提供了高度穩健的環境。在我們半年的使用過程中也驗證了其非常穩定。 - High Scalability:(高擴展性)
由於我們要支持實施項目/SAAS產品兩種場景下的使用,DolphinScheduler的多租戶支持很好的契合了SAAS場景下資源隔離的使用需求。同時其擴縮容能力、高度的調度任務上限(10萬+)都能很好的支撐我們業務的擴展性需求。 - 豐富的數據集成能力
DolphinScheduler提供的任務類型已經遠遠涵蓋了我們經常使用的任務類型(DataX、SeaTunnel的支持本身就涵蓋了較多的Source2Target同步/推送場景)。 - 支持Kubernetes部署
上文提到在私有化的部署方式下客戶的部署環境大不相同,對於實施團隊來說,如果能夠簡單、高效、一致的完成部署則會極大的提高項目投遞部署效率,同時也能減少很多因環境原因而產生的問題。 - 不僅僅是調度
在調研DolphinScheduler的過程中我們發現,除了調度本身這個環節,結合DCMM(數據管理能力成熟度評估模型)的國標8個能力域,DolphinScheduler在數據質量模塊也做了很多實現,這無疑又命中了我們對於數據質量能力建設的需求。同時Apache DolphinScheduler的服務架構中還有AlertServer服務。作為整體數據中台方案來說後期完全可以將所有報警需求集成在Apache DolphinScheduler的報警服務中,避免多系統重覆造輪子。從這些點考慮DolphinScheduler它不僅僅是一個調度工具而更像是一個數據開發平臺。(期待隨著社區的迭代會有更完整的生態實現) - 問題處理難度
DolphinScheduler社區非常的活躍,在加入DolphinScheduler社區群後每天都可以看到非常多的伙伴在交流關於Apache DolphinScheduler使用過程中的問題,社區人員也都積極的予以回覆。同時Dolphinscheduler又是咱們國產開源軟體,所以完全不必擔心存在溝通上的障礙。
四、基於DolphinScheduler的項目實踐
1、DolphinScheduler ON Kubernetes
DolphinScheduler支持多種部署方式:單機部署(Standalone)、偽集群部署(Pseudo-Cluster)、集群部署(Cluster)、Kubernetes部署(Kubernetes)。在項目實施的場景下由於客戶提供的部署環境千變萬化,我們需要一種穩定、快速、不挑環境的部署方式。Apache DolphinScheduler on K8S的部署方式很好的滿足了我們的需求,此部署方式能極大的提高整體項目的部署效率及動態擴展性。
- Kubernetes是一個開源的容器編排平臺,可以實現容器的自動化部署、擴縮容、服務發現、負載均衡,可以提高DolphinScheduler的可用性、可擴展性和可維護性
- Kubernetes可以支持多種存儲類型,包括本地存儲、網路存儲和雲,可以滿足DolphinScheduler多節點共用持久化存儲需求
- Kubernetes可以支持多種調度策略,包括親和性、反親和性、污點和容忍,可以優化DolphinScheduler的資源利用率,提高任務執行效率。
- Kubernetes可以支持多種監控和日誌方案,包括Prometheus、Grafana、ELK等等,可以方便地對DolphinScheduler的運行狀態和性能進行監控,分析和告警
在部署Apache DolphinScheduler on K8S 的過程中我們也曾遇到過一些問題,下麵是我們總結的一些Kubernetes部署要點:
自定義鏡像
FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-alert-server:版本號
# 如果你想支持 MySQL 數據源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
dolphinscheduler-api
FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-api:版本號
# 如果你想支持 MySQL 數據源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
# 如果你想支持 Oracle 數據源
COPY ./ojdbc8-19.9.0.0.jar /opt/dolphinscheduler/libs
dolphinscheduler-master
FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-master:版本號
# 如果你想支持 MySQL 數據源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
dolphinscheduler-tools
FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-tools:版本號
# 如果你想支持 MySQL 數據源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
dolphinscheduler-worker
FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:版本號
# 如果你想支持 MySQL 數據源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
# 如果你想支持 Oracle 數據源
COPY ./ojdbc8-19.9.0.0.jar /opt/dolphinscheduler/libs
# 如果你想支持 hadoop 數據源
COPY ./hadoop-common-2.7.3.jar /opt/dolphinscheduler/libs
COPY ./hadoop-core-1.2.1.jar /opt/dolphinscheduler/libs
# 如果你想支持 hive 數據源
COPY ./hive-common.jar /opt/dolphinscheduler/libs
COPY ./hive-jdbc.jar /opt/dolphinscheduler/libs
COPY ./hive-metastore.jar /opt/dolphinscheduler/libs
COPY ./hive-serde.jar /opt/dolphinscheduler/libs
COPY ./hive-service.jar /opt/dolphinscheduler/libs
# 安裝python3環境
RUN apt-get update && \
apt-get install -y --no-install-recommenApache DolphinScheduler curl && \
rm -rf /var/lib/apt/lists/*
RUN apt-get update && \
apt-get install -y --no-install-recommenApache DolphinScheduler libcurl4-openssl-dev libssl-dev && \
rm -rf /var/lib/apt/lists/*
RUN apt-get update && \
apt-get install -y --no-install-recommenApache DolphinScheduler python3 && \
rm -rf /var/lib/apt/lists/*
RUN apt-get update && \
apt-get install -y --no-install-recommenApache DolphinScheduler python3-pip && \
rm -rf /var/lib/apt/lists/*
# 安裝dataX 並且解壓縮
COPY ./datax.tar.gz /home
RUN tar -zxvf /home/datax.tar.gz -C /opt
配置文件修改
官方教程中的helm進行安裝,在安裝過程中需要修改源碼中 "/deploy/kubernetes/dolphinscheduler/
" 路徑下的 "values.yaml,Chart.yaml
" 里的相關鏡像和版本(註:原配置沒有指定持久化儲存類,會使用預設的存儲類,建議是修改並使用可多節點讀寫並且可以彈性擴展的,多節點讀寫便於worker節點共用同一套lib)
生產配置
- dolphinscheduler-api 3 副本(註:預設是 1副本,但是實際使用中發現,平臺頁面在使用過程中會有卡頓,增加副本數之後,會有明顯改善)
- dolphinscheduler-alert 1副本
- dolphinscheduler-zookeeper 1副本
- dolphinscheduler-worker 3副本
- dolphinscheduler-master 3副本
k8s部署總結
採用k8s部署後,最大感受就是可排除環境干擾,快速擴展,遷移,部署,幫助我司實現了數據中台私有化中的調度標準化,該方案已在多個景區中進行快速落地並應用。
2、基於SQL腳本血緣的DolphinScheduler工作流自動化實現
項目背景
基於景區中各個業務系統(票務、營銷、安防、酒店、商業、能耗、停車等)在景區機房下建設數據中台,完成以下應用需求:
- 滿足各個業務部門日常的報表需求
- 支持各類大屏看板展示
- 為景區的管理層決策提供數據依據
技術選型
數據倉庫:Doris
調度工具:DolphinScheduler 使用版本:3.0.0
版本管理:Gitlab
容器編排:Kubernete
處理流程
- 業務分析與指標確認:與業務部門溝通,瞭解業務需求和目標,明確數據指標的定義、計算邏輯和展示方式。
- 數據倉庫分層和設計:根據數據倉庫的四層架構(ODS、DWD、DWS、ADS),設計數據模型和表結構,規範命名和註釋。
- 數據腳本開發:編寫數據抽取、清洗、轉換、載入等腳本,實現數據從源系統到目標表的流轉和處理。
- 數據任務調度:入倉後的執行腳本通過血緣識別依賴自動錄入DolphinScheduler形成工作流任務調度(設置任務依賴、觸發條件、重試策略等參數)監控任務運行狀態和日誌。
- 數據輸出和文檔:輸出標準指標庫和相關文檔,供BI、可視化報表等使用,同時編寫數據開發文檔,記錄數據開發過程中的關鍵信息和問題。
下麵介紹下我們根據SQL血緣識別自動生成Apache DolphinScheduler調度任務的實現過程:
在DolphinScheduler平臺上開發數據調度工作流的過程中我們遇到一個問題:如果一個工作流下的任務量太多,在原有的UI界面中想通過界面方式完成配置更改以及血緣關係的建立等操作會非常不便捷。即便是通過任務定義去配置,當上百個任務同時需要配置依賴關係時也是一個不小的工作量開銷而且還容易出錯,後期的更新迭代也較為不便。
我們結合工作流下SQL任務本身的特點(數倉SQL任務是整體按照Apache DolphinScheduler、DWD、DWS、Apache DolphinScheduler 的計算流程進行計算,每個表對應一個Apache DolphinScheduler的Task既每個Task都會包含SourceTable及TargetTabe。通過這層關係我們就可以構建起完整的數倉任務血緣依賴)。基於以上想法我們實現了從數據腳本自動生成對應的工作流和任務的自動化方案:
- 數據入倉後的開發腳本以每個表單為單位對應生成一個TaskSQL腳本提交到git。
- 自動化工具生成工作流及任務前自動從git庫中獲取最新的數據腳本。
- 自動化工具拉取到最新代碼後識別和分析所有數據腳本之間的血緣關係。
- 自動化工具通過血緣關係自動將所有的任務關聯並組裝到一個工作流中:每個任務執行完成後,會立即觸發下游任務,最大化地利用伺服器資源。
以下是該實現的核心代碼:
sql解析
SqlParse是使用阿裡的druid中的組件MySqlStatementParser
/**
* sql解析
* @param sqlStr
* @return
*/
public static Map<String, Set<String>> sqlParser(String sqlStr) {
List<String> sqlList = StrUtil.split(sqlStr, ";");
Map<String, Set<String>> map = new HashMap<>();
for (String sql : sqlList) {
if (StrUtil.isBlank(sql)) {
continue;
}
// 這裡使用的時 mysql 解析
MySqlStatementParser parser = new MySqlStatementParser(sql);
SQLStatement sqlStatement = parser.parseStatement();
MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
sqlStatement.accept(visitor);
Map<TableStat.Name, TableStat> tableStatMap = visitor.getTables();
for (Map.Entry<TableStat.Name, TableStat> tableStatEntry : tableStatMap.entrySet()) {
String name = tableStatEntry.getKey().getName();
// 這裡的 value 有兩種 Select(父級)、Insert(子級)
String value = tableStatEntry.getValue().toString();
if (map.containsKey(value)) {
map.get(value).add(name);
} else {
Set<String> list = new HashSet<>();
list.add(name);
map.put(value, list);
}
}
}
return map;
}
節點對象定義
/**
* 任務節點定義
*/
public class Apache DolphinSchedulerTaskNode implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 源表
*/
private List<String> sourceTableName = new ArrayList<>();
/**
* 目標表
*/
private String targetTableName;
/**
* 源sql
*/
private String sql;
/**
* 用sql做一個MD5簽名
*/
private String md5;
/**
* 用sql名稱
*/
private String name;
/**
* 任務code
*/
private Long taskCode;
...
}
/**
* 樹型節點定義
*/
public class MyTreeNode extenApache DolphinScheduler Apache DolphinSchedulerTaskNode {
/**
* 添加一個子節點列表屬性
*/
private List<MyTreeNode> children;
...
}
樹型結構組裝
/**
* 解析sql,並組裝node
*
* @param files
* @return
*/
private static List<MyTreeNode> treeNodeProcess(List<File> files) {
List<MyTreeNode> sourceList = new ArrayList<>();
for (File sqlFile : files) {
// 1 取出裡面的 sql 腳本內容
String sql = FileUtil.readUtf8String(sqlFile);
// 2 解析裡面的腳本內容
Map<String, Set<String>> map = null;
try {
// 血緣解析
map = AutoCreateTask.sqlParser(sql);
} catch (Exception e) {
log.error(" table-parser error: {}", sqlFile.getPath());
}
// 3 將每一個sql的 source , target 解析出來
if (ObjectUtil.isNotNull(map)) {
MyTreeNode treeNode = new MyTreeNode();
treeNode.setName(sqlFile.getName());
if (map.containsKey("Select")) {
Set<String> select = map.get("Select");
treeNode.setSourceTableName(new ArrayList<>(select));
}
treeNode.setSql(sql);
if (map.containsKey("Insert")) {
Set<String> insert = map.get("Insert");
treeNode.setTargetTableName(new ArrayList<>(insert).get(0));
}
sourceList.add(treeNode);
}
}
// 將sql按照 source , target 組成 樹狀結構
return TreeUtil.getTree(sourceList);
}
/**
* 組成樹狀結構
* @param sourceList
* @return
*/
public static List<MyTreeNode> getTree(List<MyTreeNode> sourceList) {
Map<String, Set<MyTreeNode>> sourceMap = new HashMap<>();
Map<String, Set<MyTreeNode>> targetMap = new HashMap<>();
for (MyTreeNode node : sourceList) {
//源表Map
List<String> subSourceTableList = node.getSourceTableName();
if (IterUtil.isNotEmpty(subSourceTableList)) {
for (String subSourceTable : subSourceTableList) {
if (sourceMap.containsKey(subSourceTable)) {
sourceMap.get(subSourceTable).add(node);
} else {
Set<MyTreeNode> nodeSet = new HashSet<>();
nodeSet.add(node);
sourceMap.put(subSourceTable, nodeSet);
}
}
}
//目標表Map
String targetTableName = node.getTargetTableName();
if (targetMap.containsKey(targetTableName)) {
targetMap.get(targetTableName).add(node);
} else {
Set<MyTreeNode> nodeSet = new HashSet<>();
nodeSet.add(node);
targetMap.put(targetTableName, nodeSet);
}
}
//創建一個列表,用於存儲根節點
List<MyTreeNode> rootList = new ArrayList<>();
for (MyTreeNode node : sourceList) {
// 將子節點處理好
String targetTableName = node.getTargetTableName();
if (sourceMap.containsKey(targetTableName)) {
List<MyTreeNode> childrenList = node.getChildren();
if (IterUtil.isEmpty(childrenList)) {
childrenList = new ArrayList<>();
node.setChildren(childrenList);
}
childrenList.addAll(sourceMap.get(targetTableName));
}
List<String> subSourceTableList = node.getSourceTableName();
boolean isRoot = true;
for (String subSourceTable : subSourceTableList) {
if (targetMap.containsKey(subSourceTable)) {
isRoot = false;
break;
}
}
if (isRoot) {
rootList.add(node);
}
}
return rootList;
}
部分效果圖展示:
自動化生成的任務定義
自動化生成的工作流定義圖
增量運行結果
自動化實現總結
- 數倉調度任務的秒級上線與切換
通過該方式將數倉開發與DolphinScheduler解耦,實現了整體數據調度任務的秒級上線與切換。這樣,我們可以快速複製標品化數據建模,極大地簡化了實施的難度。 - 血緣進行的任務關聯與生成
通過血緣進行的任務關聯與生成,大大提升了整體的資源利用率,也減少了人工的投入和成本。 - 血緣監控和管理
通過血緣監控和管理,可以幫助我們及時發現和糾正任務執行過程中的問題和錯誤,保證數據質量和準確性。
五、未來規劃
- 離線上統一調度 :實現基於Apache SeaTunnel的離線與實時數據同步調度,使得兩個場景在一個平臺完成。
- 應用基線管理:根據各任務的上下游依賴構建數據應用基線,並監控各目標任務及其依賴任務是否按時成功運行,以確保目標任務的準時產出。
- 任務指標監控:支持實時查看每個組件的指標,例如輸入記錄數、輸出記錄數和執行時間等。
- 數據血緣關係:上報數據源、目標表、欄位等信息,構建數據血緣關係圖,方便追溯數據的來源和影響。
- 資源文件版本管理:支持資源文件等的簡單多版本管理,可以查看歷史版本、回滾到指定版本等。
六、總結與致謝
- 通過使用DolphinScheduler替換原有的調度工具,使得數據開發運維實現了平臺統一化。基於Apache DolphinScheduler提供的強大集成擴展插件能力大大提升了數據集成與數據開發的效率。
- DolphinScheduler自帶的告警管理功能非常全面。配合此功能我們建立了運維值班制度以及微信告警通知的功能。故障發生時,運維人員可以第一時間收到報警通知,有效提高了故障的感知能力。
- 基於DolphinScheduler調度技術方案在多個項目中的優異表現,使得我們更好的推動了公司的數據驅動戰略。從實踐中沉澱出公司的數據實施SOP,為各個客戶、業務部門提供了及時、準確、全面的數據分析和決策支持。
- 我們第一次部署時使用的是3.0.0 版本。目前社區已經發佈了 3.1.7 迭代速度非常快。如果你們的項目正在選型調度工具,我們強烈建議使用DolphinScheduler。加入社區進群會有非常多的前輩、大佬帶你起飛。DolphinScheduler 值得大力推薦,希望大家都能從中受益,祝願DolphinScheduler生態越來越繁榮,越來越好!
本文由 白鯨開源 提供發佈支持!