用xxl-job做後臺任務管理, 主要是快速解決定時任務的HA問題, 項目代碼量不大, 功能精簡, 沒有特殊依賴. 因為產品中用到了這個項目, 上午花了點時間研究了一下運行機制. 把看到的記一下. ...
簡介
用xxl-job做後臺任務管理, 主要是快速解決定時任務的HA問題, 項目代碼量不大, 功能精簡, 沒有特殊依賴. 因為產品中用到了這個項目, 上午花了點時間研究了一下運行機制. 把看到的記一下.
- 項目地址
- 文檔 https://www.xuxueli.com/xxl-job/
環境
<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${最新穩定版本}</version>
</dependency>
運行需要 JDK1.8, MySQL5.7
資料庫結構
- 庫編碼 utf8mb4_unicode_ci
- Table: xxl_job_group
任務分組, 組名, 只支持一級分組, address_list 欄位支持多個執行端地址, 逗號分隔 - Table: xxl_job_info
任務主表, 記錄了任務明細, 調度明細以及預警設置 - Table: xxl_job_log
任務每次執行的日誌 - Table: xxl_job_log_report
按日對執行日誌進行統計的結果 - Table: xxl_job_logglue
- Table: xxl_job_registry
用於登記任務的執行者, 記錄group:分組, key:名稱, value:介面地址. 名稱是可以重覆的, 介面地址會添加到任務分組表中的註冊欄位 - Table: xxl_job_user
簡單的登錄控制, 與其它表沒有關聯 - Table: xxl_job_lock
單欄位表, 用於併發時加鎖避免衝突
代碼結構
- 項目用到的都是常見組件, MyBatis, FreeMarker, Bootstrap, 當前版本基於SpringBoot 2.6.7
- 線上運行的是 xxl-job-admin 模塊, 提供執行端註冊, 任務發起和日誌記錄等服務
- 項目中需要實現 xxl-job-executor, 項目中提供了例子
項目文件結構如下
├───doc
│ ├───db # 初始化的sql
│ └───images
├───xxl-job-admin # 運行的服務端模塊, 提供界面和調度
│ └───src
│ ├───main
│ │ ├───java
│ │ │ └───com
│ │ │ └───xxl
│ │ │ └───job
│ │ │ └───admin
│ │ │ ├───controller
│ │ │ │ ├───annotation
│ │ │ │ ├───interceptor
│ │ │ │ └───resolver
│ │ │ ├───core
│ │ │ ├───dao
│ │ │ └───service
│ │ │ └───impl
│ │ └───resources
│ │ ├───i18n # 多國化, 簡繁英
│ │ ├───mybatis-mapper # xml形式的mapper
│ │ ├───static # 前端靜態文件
│ │ └───templates # Freemarker模板
│ └───test
│ └───java
│
├───xxl-job-core # 公用jar包, 模塊內部依賴
│ └───src
│ └───main
│ └───java
│
└───xxl-job-executor-samples
├───xxl-job-executor-sample-frameless # 任務執行層示例
│ └───src
│ ├───main
│ │ ├───java
│ │ └───resources
│ └───test
│ └───java
└───xxl-job-executor-sample-springboot # 使用SpringBoot的執行層示例
└───src
├───main
│ ├───java
│ └───resources
└───test
運行機制
執行端需要準備以下信息
-
adminAddresses 服務端地址, 例如 http://127.0.0.1:8080/xxl-job-admin
-
accessToken 貌似是服務端的token, 在調用服務端 api/registry, api/registryRemove 等操作時需要驗證
-
appname 執行端名稱
-
address 執行端地址, 和 ip:port 二選一, 存在則覆蓋 ip:port
-
ip 執行端IP
-
port 執行端服務埠
-
執行端啟動後將自己註冊到服務端, 等待回調
-
任務執行通過 XxlJobTrigger.processTrigger() 發起, 準備參數, 併在分組中選擇一個地址
-
根據這個地址取得 ExecutorBiz, 調用 executorBiz.run() 執行任務
-
服務端: 通過 ExecutorBizClient,
- 調用
XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
- 其中
accessToken
是服務端的accessToken
- 調用
-
執行端: 通過
ExecutorBizImpl.run()
- 調用
XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
得到XxlJob方法 - 通過
XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason)
執行
- 調用
非 Spring 的場景
通過調用 FrameLessXxlJobConfig.getInstance().initXxlJobExecutor() 這個方法將 XxlJobSimpleExecutor 實例化, 並註冊到xxl_job服務端
Spring 場景
- 在
@Configuration
中, 將 XxlJobSpringExecutor 作為一個@Bean
添加到 Spring context - XxlJobSpringExecutor 是 XxlJobExecutor 的子類並實現了
SmartInitializingSingleton
介面的afterSingletonsInstantiated()
方法 - 在
afterSingletonsInstantiated()
方法中- 調用 initJobHandlerMethodRepository(), 在這個方法中, 找到所有
@XxlJob
註解的方法 - 通過
registJobHandler()
, 將@XxlJob
方法添加到private static ConcurrentMap<String, IJobHandler> jobHandlerRepository
- 調用
XxlJobExecutor.start()
, 將自己註冊到 xxl_job 服務端
- 調用 initJobHandlerMethodRepository(), 在這個方法中, 找到所有
遠程調用服務
xxl_job 並未使用Spring的服務機制, 而是內部實現了一個偵聽指定IP+埠的服務. 這個實現對應的類是 EmbedServer, 服務基於 Netty, 核心代碼是
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
這行代碼註冊了內部的XxlJob方法
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)
處理遠程請求時, 在下麵的代碼中, 通過executorBiz.run(triggerParam)
調用XxlJob方法
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
//...
// services mapping
try {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
//...
}
鎖機制
通過select ... for update
實現的, 這個表並沒有放到 MyBatis, 在 JobScheduleHelper 中, 通過
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
得到鎖, 在方法末尾釋放
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}