xxl-job定時調度任務Java代碼分析

来源:https://www.cnblogs.com/milton/archive/2022/12/20/16994113.html
-Advertisement-
Play Games

用xxl-job做後臺任務管理, 主要是快速解決定時任務的HA問題, 項目代碼量不大, 功能精簡, 沒有特殊依賴. 因為產品中用到了這個項目, 上午花了點時間研究了一下運行機制. 把看到的記一下. ...


簡介

用xxl-job做後臺任務管理, 主要是快速解決定時任務的HA問題, 項目代碼量不大, 功能精簡, 沒有特殊依賴. 因為產品中用到了這個項目, 上午花了點時間研究了一下運行機制. 把看到的記一下.

環境

<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>${最新穩定版本}</version>
</dependency>

運行需要 JDK1.8, MySQL5.7

資料庫結構

  • 庫編碼 utf8mb4_unicode_ci
  • Table: xxl_job_group
    任務分組, 組名, 只支持一級分組, address_list 欄位支持多個執行端地址, 逗號分隔
  • Table: xxl_job_info
    任務主表, 記錄了任務明細, 調度明細以及預警設置
  • Table: xxl_job_log
    任務每次執行的日誌
  • Table: xxl_job_log_report
    按日對執行日誌進行統計的結果
  • Table: xxl_job_logglue
  • Table: xxl_job_registry
    用於登記任務的執行者, 記錄group:分組, key:名稱, value:介面地址. 名稱是可以重覆的, 介面地址會添加到任務分組表中的註冊欄位
  • Table: xxl_job_user
    簡單的登錄控制, 與其它表沒有關聯
  • Table: xxl_job_lock
    單欄位表, 用於併發時加鎖避免衝突

代碼結構

  • 項目用到的都是常見組件, MyBatis, FreeMarker, Bootstrap, 當前版本基於SpringBoot 2.6.7
  • 線上運行的是 xxl-job-admin 模塊, 提供執行端註冊, 任務發起和日誌記錄等服務
  • 項目中需要實現 xxl-job-executor, 項目中提供了例子

項目文件結構如下

├───doc
│   ├───db                                               # 初始化的sql
│   └───images
├───xxl-job-admin                                        # 運行的服務端模塊, 提供界面和調度
│   └───src
│       ├───main
│       │   ├───java
│       │   │   └───com
│       │   │       └───xxl
│       │   │           └───job
│       │   │               └───admin
│       │   │                   ├───controller
│       │   │                   │   ├───annotation
│       │   │                   │   ├───interceptor
│       │   │                   │   └───resolver
│       │   │                   ├───core
│       │   │                   ├───dao
│       │   │                   └───service
│       │   │                       └───impl
│       │   └───resources
│       │       ├───i18n                                 # 多國化, 簡繁英
│       │       ├───mybatis-mapper                       # xml形式的mapper
│       │       ├───static                               # 前端靜態文件
│       │       └───templates                            # Freemarker模板
│       └───test
│           └───java
│
├───xxl-job-core                                         # 公用jar包, 模塊內部依賴
│   └───src
│       └───main
│           └───java
│
└───xxl-job-executor-samples
    ├───xxl-job-executor-sample-frameless                # 任務執行層示例
    │   └───src
    │       ├───main
    │       │   ├───java
    │       │   └───resources
    │       └───test
    │           └───java
    └───xxl-job-executor-sample-springboot               # 使用SpringBoot的執行層示例
        └───src
            ├───main
            │   ├───java
            │   └───resources
            └───test

運行機制

執行端需要準備以下信息

  • adminAddresses 服務端地址, 例如 http://127.0.0.1:8080/xxl-job-admin

  • accessToken 貌似是服務端的token, 在調用服務端 api/registry, api/registryRemove 等操作時需要驗證

  • appname 執行端名稱

  • address 執行端地址, 和 ip:port 二選一, 存在則覆蓋 ip:port

  • ip 執行端IP

  • port 執行端服務埠

  • 執行端啟動後將自己註冊到服務端, 等待回調

  • 任務執行通過 XxlJobTrigger.processTrigger() 發起, 準備參數, 併在分組中選擇一個地址

  • 根據這個地址取得 ExecutorBiz, 調用 executorBiz.run() 執行任務

  • 服務端: 通過 ExecutorBizClient,

    • 調用XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    • 其中 accessToken 是服務端的accessToken
  • 執行端: 通過 ExecutorBizImpl.run()

    • 調用 XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());得到XxlJob方法
    • 通過 XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason) 執行

非 Spring 的場景

通過調用 FrameLessXxlJobConfig.getInstance().initXxlJobExecutor() 這個方法將 XxlJobSimpleExecutor 實例化, 並註冊到xxl_job服務端

Spring 場景

  • @Configuration 中, 將 XxlJobSpringExecutor 作為一個 @Bean 添加到 Spring context
  • XxlJobSpringExecutor 是 XxlJobExecutor 的子類並實現了 SmartInitializingSingleton 介面的 afterSingletonsInstantiated()方法
  • afterSingletonsInstantiated()方法中
    • 調用 initJobHandlerMethodRepository(), 在這個方法中, 找到所有@XxlJob註解的方法
    • 通過 registJobHandler(), 將@XxlJob方法添加到private static ConcurrentMap<String, IJobHandler> jobHandlerRepository
    • 調用 XxlJobExecutor.start(), 將自己註冊到 xxl_job 服務端

遠程調用服務

xxl_job 並未使用Spring的服務機制, 而是內部實現了一個偵聽指定IP+埠的服務. 這個實現對應的類是 EmbedServer, 服務基於 Netty, 核心代碼是

// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline()
                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                        .addLast(new HttpServerCodec())
                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
            }
        })
        .childOption(ChannelOption.SO_KEEPALIVE, true);

這行代碼註冊了內部的XxlJob方法

.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)

處理遠程請求時, 在下麵的代碼中, 通過executorBiz.run(triggerParam)調用XxlJob方法

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    //...
    // services mapping
    try {
        switch (uri) {
            case "/beat":
                return executorBiz.beat();
            case "/idleBeat":
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            case "/run":
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            case "/kill":
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            case "/log":
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            default:
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
        }
    } catch (Exception e) {
    //...
}

鎖機制

通過select ... for update實現的, 這個表並沒有放到 MyBatis, 在 JobScheduleHelper 中, 通過

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

得到鎖, 在方法末尾釋放

// close PreparedStatement
if (null != preparedStatement) {
    try {
        preparedStatement.close();
    } catch (SQLException e) {
        if (!scheduleThreadToStop) {
            logger.error(e.getMessage(), e);
        }
    }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 軟體開發中常見的幾種不同服務模型包括SaaS(軟體即服務)、LaaS(許可即服務)、PaaS(平臺即服務)、CaaS(容器即服務)、IaaS(基礎設施即服務)和FaaS(功能即服務)。 很多人認為IaaS和FaaS是趨勢,是未來軟體設計與開發人員的基本必備技能,PowerDotNet和PowerDo ...
  • matplotlib plt.plot()繪製線性圖 繪製單條線形圖 繪製多條線形圖 設置坐標系的比例plt.figure(figsize=(a,b)) 設置圖例legend() 設置軸的標識 圖例保存 fig = plt.figure() plt.plot(x,y) figure.savefig( ...
  • 1.技術介紹 框架:unittest 請求處理:requests excel數據處理:openpyxl 參數化:ddt 配置解析器:configparser 報告模板:HTMLTestRunnerNew.py(下載地址:https://pan.baidu.com/s/1w9AZU9AkIpxCYuz ...
  • pandas的級聯和合併 級聯操作 pd.concat, pd.append pandas使用pd.concat函數,與np.concatenate函數類似,只是多了一些參數: objs axis=0 keys join='outer' / 'inner':表示的是級聯的方式,outer會將所有的項 ...
  • 1.configparser介紹 configparser是python自帶的配置參數解析器。可以用於解析.config文件中的配置參數。ini文件中由sections(節點)-key-value組成 2.安裝: pip install configparse 3.獲取所有的section impo ...
  • 1.技術介紹 框架:unittest 請求處理:requests excel數據處理:openpyxl 參數化:ddt 報告模板:HTMLTestRunnerNew.py(下載地址:https://pan.baidu.com/s/1w9AZU9AkIpxCYuzTto0EQA?pwd=1234) t ...
  • 家居網購項目實現05 以下皆為部分代碼,詳見 https://github.com/liyuelian/furniture_mall.git 12.功能11-後臺分頁(分頁顯示家居) 12.1需求分析/圖解 管理員進入到家居管理後臺頁面 點擊家居管理,可以按分頁規則顯示家居信息 12.2思路分析 分 ...
  • 1.裝飾器 #裝飾器的官方定義: 裝飾器本質上是一個Python函數(其實就是閉包),它可以讓其他函數在不需要做任何代碼變動的前提下增加額外功能,裝飾器的返回值也是一個函數對象。裝飾器用於有以下場景,比如:插入日誌、性能測試、事務處理、緩存、許可權校驗等場景。 2.ddt import unittes ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...