如何快速在 Apache DolphinScheduler 新擴展一個任務插件?

来源:https://www.cnblogs.com/DolphinScheduler/archive/2023/09/21/17720148.html
-Advertisement-
Play Games

作者 | 代立冬 編輯 | Debra Chen Apache DolphinScheduler 是現代數據工作流編排平臺,具有非常強大的可視化能力,DolphinScheduler 致力於使數據工程師、分析師、數據科學家等數據工作者都可以簡單輕鬆地搭建各種數據工作流,讓數據處理流程更簡單可靠。 D ...


file

作者 | 代立冬

編輯 | Debra Chen

Apache DolphinScheduler 是現代數據工作流編排平臺,具有非常強大的可視化能力,DolphinScheduler 致力於使數據工程師、分析師、數據科學家等數據工作者都可以簡單輕鬆地搭建各種數據工作流,讓數據處理流程更簡單可靠。

DolphinScheduler 非常易於使用(easy to use),目前有四種創建工作流的方法:

  • 在 UI 界面上直接通過拖放任務的方式來創建任務
  • PyDolphinScheduler,通過 Python API 創建工作流,也就是 workflow as code 的方式
  • 編寫 yaml 文件,通過 yaml 創建工作流(目前必須安裝 PyDolphinScheduler)
  • 通過 Open API 的方式來創建工作流

以上 4 種總有一種方式適合您的場景!

得益於 DolphinScheduler 採用無中心化的整體架構設計,使得 DolphinScheduler 調度性能也是同類開源數據工作流編排平臺的 5 倍以上,如果您正有這樣的性能問題或者調度延時問題,也不妨試試 DolphinScheduler。

file
DolphinScheduler界面

好的,接下來言歸正題,有不少用戶想在 DolphinScheduler 擴展新的任務插件支持(比如添加 Kettle),DolphinScheduler 的任務插件體系是基於 SPI 來進行任務插件擴展的。

什麼是 SPI 服務發現?

SPI 是 Service Provider Interface 的縮寫,是一種常見的服務提供發現機制,比如知名的 OLAP 引擎 Presto 也是使用 SPI 來擴展的。在 java.util.ServiceLoader 的文檔里有比較詳細的介紹,其抽象的概念是指動態載入某個服務實現。

比如 java.sql.Driver 介面,不同廠商可以針對同一介面做出不同的實現,比如 MySQL 和 PostgreSQL 都有不同的實現提供給用戶,而 Java 的 SPI 機制可以為某個介面尋找服務實現。Java 中 SPI 機制主要思想是將裝配的控制權移到程式之外,在模塊化設計中這個機制尤其重要,其核心思想就是解耦。

SPI 整體機製圖如下:
file

SPI 機制中有 4 個重要的組件 :

  • 服務介面 Service Interface
  • 服務介面實現:不同的服務提供方可以提供一個或多個實現;框架或者系統本身也可以提供預設的實現
  • 提供者註冊 API(Provider Registration API),這是提供者用來註冊實現的
  • 服務訪問 API (Service Access API) ,這是調用方用來獲取服務的實例的介面

Apache DolphinScheduler 從 2.0 版本開始引入 SPI。將 Apache DolphinScheduler 的 Task 看成一個執行服務,而我們需要根據使用者的選擇去執行不同的服務,如果沒有的服務,則需要我們自己擴充,我們只需要完成我們的 Task 具體實現邏輯,然後遵守 SPI 的規則,編譯成 Jar 並上傳到指定目錄,就可以使用我們自己編寫的 Task 插件來執行具體的任務了。

誰在使用它?

除了前面提到的 Presto 外,還有以下技術都使用到 SPI 技術:

1、Apache DolphinScheduler

  • Task
  • Datasource

2、Apache Flink

  • Flink sql connector,用戶實現了一個 Flink-connector 後,Flink 也是通過 SPI 來動態載入的

3、SpringBoot

  • Spring boot spi

4、JDBC

  • JDBC4 也基於 SPI 的機制來發現驅動提供商了,可以通過META-INF/services/java.sql.Driver 文件里指定實現類的方式來暴露驅動提供者

5、更多

  • common-logging

DolphinScheduler SPI工作流程

file

如上圖,Apache DolphinScheduler 中有 2 種 Task : 邏輯 Task 和物理 Task,邏輯 Task 指 Dependent Task,Switch Task 這種控制工作流邏輯的任務插件;物理 Task 是指 Shell Task,SQL Task ,Spark Task ,Python Task 等這種執行具體任務的 Task。

在 Apache DolphinScheduler 中,我們一般擴充的都是物理 Task,物理 Task 都是由 Worker 來調用並執行的,當啟動 Worker 服務時,Worker 會來載入相應的實現了規則的 Task lib,HiveTask 被 Apache DolphinScheduler TaskPluginManage 載入了。SPI 的規則圖上也有描述,也可以參考 java.util.ServiceLoader 類。

如何擴展一個任務插件?

創建 Maven 項目

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.dolphinscheduler \
    -DarchetypeArtifactId=dolphinscheduler-hive-client-task \
    -DarchetypeVersion=1.10.0 \
    -DgroupId=org.apache.dolphinscheduler \
    -DartifactId=dolphinscheduler-hive-client-task \
    -Dversion=0.1 \
    -Dpackage=org.apache.dolphinscheduler \
    -DinteractiveMode=false 

Maven 依賴

org.apache.dolphinscheduler
     dolphinscheduler-spi
     ${dolphinscheduler.lib.version}
     ${common.lib.scope}




     org.apache.dolphinscheduler
     dolphinscheduler-task-api
     ${dolphinscheduler.lib.version}
     ${common.lib.scope}

創建 Task 通道工廠(TaskChannelFactory)

org.apache.dolphinscheduler.spi.task.TaskChannel

插件實現以上介面即可。主要包含創建任務(任務初始化,任務運行等方法)、任務取消,如果是 yarn 任務,則需要實現 org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask。

我們在 dolphinscheduler-task-api 模塊提供了所有任務對外訪問的 API,而 dolphinscheduler-spi 模塊則是 spi 通用代碼庫,定義了所有的插件模塊,比如告警模塊,註冊中心模塊等,你可以詳細閱讀查看。

首先我們需要創建任務服務的工廠,其主要作用是幫助構建 TaskChannel 以及 TaskPlugin 參數,同時給出該任務的唯一標識,ChannelFactory 在 Apache DolphinScheduler 的 Task 服務組中,其作用屬於是在任務組中的承上啟下,交互前後端以及幫助 Worker 構建 TaskChannel。

package org.apache.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.List;
public class HiveClientTaskChannelFactory implements TaskChannelFactory {
    /**
    * Create task channel, execute task through this channel
     * @return task channel
     */
    @Override
    public TaskChannel create() {
        return new HiveCliTaskChannel();
    }
    /**
    * Returns the global unique identifier of this task
     * @return task name
     */
    @Override
    public String getName() {
        return "HIVECLI";
    }
    /**
    * Parameters required for front-end pages
     * @return
     */
    @Override
    public List getParams() {
        return null;
    }
}

創建 TaskChannel

有了工廠之後,我們會根據工廠創建出 TaskChannel,TaskChannel 包含如下兩個方法,一個是取消,一個是創建,目前不需要關註取消,主要關註創建任務。

   void cancelApplication(boolean status);
    /**
     * 構建可執行任務
     */
    AbstractTask createTask(TaskRequest taskRequest);
    public class HiveClientTaskChannel implements TaskChannel {
    @Override
    public void cancelApplication(boolean b) {
        //do nothing
    }
    @Override
    public AbstractTask createTask(TaskRequest taskRequest) {
        return new HiveClientTask(taskRequest);
    }
}

構建 Task 實現

通過 TaskChannel 我們得到了可執行的物理 Task,但是我們需要給當前 Task 添加相應的實現,才能夠讓Apache DolphinScheduler 去執行你的任務,首先在編寫 Task 之前我們需要先瞭解一下 Task 之間的關係:

file

通過上圖我們可以看到,基於 Yarn 執行任務的 Task 都會去繼承 AbstractYarnTask,不需要經過 Yarn 執行的都會去直接繼承 AbstractTaskExecutor,主要是包含一個 AppID,以及 CanalApplication setMainJar 之類的方法,想知道的小伙伴可以自己去深入研究一下,如上可知我們實現的 HiveClient 就需要繼承 AbstractYarnTask,在構建 Task 之前,我們需要構建一下適配 HiveClient 的 Parameters 對象用來反序列化JsonParam。

package com.jegger.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import java.util.List;
public class HiveClientParameters extends AbstractParameters {
    /**
     * 用HiveClient執行,最簡單的方式就是將所有SQL全部貼進去即可,所以我們只需要一個SQL參數
     */
    private String sql;
    public String getSql() {
        return sql;
    }
    public void setSql(String sql) {
        this.sql = sql;
    }
    @Override
    public boolean checkParameters() {
        return sql != null;
    }
    @Override
    public List getResourceFilesList() {
        return null;
    }
}

實現了 Parameters 對象之後,我們具體實現 Task,例子中的實現比較簡單,就是將用戶的參數寫入到文件中,通過 Hive -f 去執行任務。

package org.apache.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class HiveClientTask extends AbstractYarnTask {
    /**
     * hive client parameters
     */
    private HiveClientParameters hiveClientParameters;
    /**
     * taskExecutionContext
     */
    private final TaskRequest taskExecutionContext;
    public HiveClientTask(TaskRequest taskRequest) {
        super(taskRequest);
        this.taskExecutionContext = taskRequest;
    }
    /**
     * task init method
     */
    @Override
    public void init() {
        logger.info("hive client task param is {}", JSONUtils.toJsonString(taskExecutionContext));
        this.hiveClientParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HiveClientParameters.class);
        if (this.hiveClientParameters != null && !hiveClientParameters.checkParameters()) {
            throw new RuntimeException("hive client task params is not valid");
        }
    }
    /**
     * build task execution command
     *
     * @return task execution command or null
     */
    @Override
    protected String buildCommand() {
        String filePath = getFilePath();
        if (writeExecutionContentToFile(filePath)) {
            return "hive -f " + filePath;
        }
        return null;
    }
    /**
     * get hive sql write path
     *
     * @return file write path
     */
    private String getFilePath() {
        return String.format("%s/hive-%s-%s.sql", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskName(), this.taskExecutionContext.getTaskInstanceId());
    }
    @Override
    protected void setMainJarName() {
        //do nothing
    }
    /**
     * write hive sql to filepath
     *
     * @param filePath file path
     * @return write success?
     */
    private boolean writeExecutionContentToFile(String filePath) {
        Path path = Paths.get(filePath);
        try (BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {
            writer.write(this.hiveClientParameters.getSql());
            logger.info("file:" + filePath + "write success.");
            return true;
        } catch (IOException e) {
            logger.error("file:" + filePath + "write failed.please path auth.");
            e.printStackTrace();
            return false;
        }
    }
    @Override
    public AbstractParameters getParameters() {
        return this.hiveClientParameters;
    }
}

遵守 SPI 規則

# 1,Resource下創建META-INF/services文件夾,創建介面全類名相同的文件
zhang@xiaozhang resources % tree ./
./
└── META-INF
    └── services
        └── org.apache.dolphinscheduler.spi.task.TaskChannelFactory
# 2,在文件中寫入實現類的全限定類名
zhang@xiaozhang resources % more META-INF/services/org.apache.dolphinscheduler.spi.task.TaskChannelFactory 
org.apache.dolphinscheduler.plugin.task.hive.HiveClientTaskChannelFactory

打包和部署

## 1,打包
mvn clean install
## 2,部署
cp ./target/dolphinscheduler-task-hiveclient-1.0.jar $DOLPHINSCHEDULER_HOME/lib/
## 3,restart dolphinscheduler server

以上操作完成後,我們查看 worker 日誌 tail -200f $Apache DolphinScheduler_HOME/log/Apache DolphinScheduler-worker.log

file

Apache DolphinScheduler 的插件開發就到此完成~涉及到前端的修改可以參考:
Apache DolphinScheduler-ui/src/js/conf/home/pages/dag/_source/formModel/

  • NOTICE:目前任務插件的前端還沒有實現,因此你需要單獨實現插件對應的前端頁面。

TaskChannelFactory 繼承自 PrioritySPI,這意味著你可以設置插件的優先順序,當你有兩個插件同名時,你可以通過重寫 getIdentify 方法來自定義優先順序。高優先順序的插件會被載入,但是如果你有兩個同名且優先順序相同的插件,載入插件時伺服器會拋出 IllegalArgumentException。

如果任務插件存在類衝突,你可以採用 Shade-Relocating Classes(https://maven.apache.org/plugins/maven-shade-plugin/)來解決這種問題。

如果您有興趣試試 Apache DolphinScheduler ,歡迎微信添加小助手 Leonard-ds 或加入 DolphinScheduler Slack: https://s.apache.org/dolphinscheduler-slack, 我將免費全力支持您!

本文由 白鯨開源 提供發佈支持!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 上篇文章講述了C#集合知識點,本文將介紹C#異常處理知識點。異常處理是.NET開發中至關重要的一部分,它允許開發者在程式出現錯誤或不正常情況時採取適當的措施,從而提高應用程式的穩定性和可靠性。本文將介紹C#異常處理知識點,異常的基本概念略過,請查看官網。 1、自定義異常 開發者可以創建自定義異常類, ...
  • 在學習C#中的記錄類型時,對出現的Equals和ReferenceEquals得到的不同結果表示不理解,隨即進行相關資料查找。 值類型 == : 比較兩者的“內容”是否相同,即“值”是否一樣Equals:比較兩者的“內容”是否相同,即“值”是否一樣ReferenceEquals:返回false,因為 ...
  • shell批量執行命令與文件傳輸腳本 需求: 對未進行主機信任操作的伺服器進行批量操作 實現: 由於ssh只能在交互模式中輸入伺服器密碼進行登錄登操作,不便於進行大批量伺服器進行巡檢或日誌採集。sshpass恰好又解決了這個問題,使用ssh -p passwd可以實現命令行輸入密碼操作,便於進行規模 ...
  • 一、目錄介紹 /:表示的是根的意思 /bin:(binary)存放的是一些二進位文件,但是在Linux中二進位文件是可以被執行的。這個目錄中的命令文件是給普通用戶使用(非超級管理員用戶)。 /etc:Linux下所有的配置文件都會存放到etc目錄。 /home:是所有非root用戶家目錄的一個集合。 ...
  • 測試伺服器CPU單核及多核SuperPI圓周率測試real和user值,SuperPI是利用CPU的浮點運算能力來計算出π(圓周率),測試系統穩定性和測試CPU計算完後特定位數圓周率所需的時間;及Unixbench單核及多核測試Index得分,測試方法如下: 類型 預期結果 測試步驟 SuperPI ...
  • 可擴展性對於物聯網管理系統的設計和開發非常重要,它直接影響著系統的性能、可靠性和能耗等方面,是評估一個系統優劣的重要因素之一。可擴展性對物聯網管理系統的影響主要體現在以下幾個方面: ...
  • 1. 為什麼需要加鎖 在日常生活中,如果你心情不好想靜靜,不想被比別人打擾,你就可以把自己關進房間里,並且反鎖。這就是生活中的加鎖。 同理,對於MySQL資料庫來說的話,一般的對象都是一個事務一個事務來說的。所以,如果一個事務內,一個SQL正在更新某條記錄,我們肯定不想它被別的事務影響到嘛?因此,數 ...
  • GraphiteMergeTree該引擎用來對Graphite數據(圖數據)進行瘦身及彙總。對於想使用ClickHouse來存儲Graphite數據的開發者來說可能有用。 如果不需要對Graphite數據做彙總,那麼可以使用任意的ClickHouse表引擎;但若需要,那就採用GraphiteMerg ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...