elastic-job源碼(1)- job自動裝配

来源:https://www.cnblogs.com/lingyujuan/archive/2023/04/26/17357040.html
-Advertisement-
Play Games

1. UML類圖 UML(Unified Modeling Language,統一建模語言),用來描述軟體模型和架構的圖形化語言。 常用的UML工具軟體有PowerDesinger、Rose和Enterprise Architect。 UML工具軟體不僅可以繪製軟體開發中所需的各種圖表,還可以生成對 ...


版本:3.1.0-SNAPSHOT git地址https://github.com/apache/shardingsphere-elasticjob   Maven 坐標
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>${latest.version}</version>
</dependency>
  Spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration

在添加elasticjob-lite-spring-boot-starter啟動類的時候,會自動載入ElasticJobLiteAutoConfiguration,接下來看下ElasticJobLiteAutoConfiguration中所做的處理。   ElasticJobLiteAutoConfiguration.java
/**
 * ElasticJob-Lite auto configuration.
 */
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)


/**
 * elastic job 開關
 * elasticjob.enabled.ture預設為true
 */
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)


/**
 * 導入
 * ElasticJobRegistryCenterConfiguration.class 註冊中心配置
 * ElasticJobTracingConfiguration.class job事件追蹤配置
 * ElasticJobSnapshotServiceConfiguration.class 快照配置
 */
@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})


/**
 * job相關配置信息
 */
@EnableConfigurationProperties(ElasticJobProperties.class)
public class ElasticJobLiteAutoConfiguration {
    
    @Configuration(proxyBeanMethods = false)
    /**
     * ElasticJobBootstrapConfiguration.class  創建job beans 註入spring容器
     * ScheduleJobBootstrapStartupRunner.class  執行類型為ScheduleJobBootstrap.class 的job開始運行
     */
    @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
    protected static class ElasticJobConfiguration {
    }
}

Elastic-job 是利用zookeeper 實現分散式job的功能,所以在自動裝配的時候,需要有zookeeper註冊中心的配置。 自動裝配主要做了4件事事 1.配置zookeeper 客戶端信息,啟動連接zookeeper. 2.配置事件追蹤資料庫,用於保存job運行記錄 3.解析所有job配置文件,將所有job的bean放置在spring 單例bean中 4.識別job類型,在zookeeper節點上處理job節點數據,運行定時任務job.   第一件事:配置zookeeper 客戶端信息,啟動連接zookeeper. ZookeeperRegistryCenter.class
public void init() {
    log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
            //設置zookeeper 伺服器地址
            .connectString(zkConfig.getServerLists())
            //設置重試機制
            .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
            //設置命名空間,zookeeper節點名稱
            .namespace(zkConfig.getNamespace());
    //設置session超時時間
    if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
        builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
    }
    //設置連接超時時間
    if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
        builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
    }
    if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
        builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
                .aclProvider(new ACLProvider() {
                
                    @Override
                    public List<ACL> getDefaultAcl() {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                
                    @Override
                    public List<ACL> getAclForPath(final String path) {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                });
    }
    client = builder.build();
    //zookeeper 客戶端開始啟動
    client.start();
    try {
        //zookeeper 客戶端一直連接
        if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
            client.close();
            throw new KeeperException.OperationTimeoutException();
        }
        //CHECKSTYLE:OFF
    } catch (final Exception ex) {
        //CHECKSTYLE:ON
        RegExceptionHandler.handleException(ex);
    }
}

 

第二件事: 配置事件追蹤資料庫,用於保存job運行記錄

ElasticJobTracingConfiguration.java

 

/**
 * Create a bean of tracing DataSource.
 *
 * @param tracingProperties tracing Properties
 * @return tracing DataSource
 */
@Bean("tracingDataSource")
//spring中註入bean name 為tracingDataSource的job資料庫連接信息
public DataSource tracingDataSource(final TracingProperties tracingProperties) {
    //獲取elastic-job 資料庫配置
    DataSourceProperties dataSource = tracingProperties.getDataSource();
    if (dataSource == null) {
        return null;
    }
    HikariDataSource tracingDataSource = new HikariDataSource();
    tracingDataSource.setJdbcUrl(dataSource.getUrl());
    BeanUtils.copyProperties(dataSource, tracingDataSource);
    return tracingDataSource;
}


/**
 * Create a bean of tracing configuration.
 *
 * @param dataSource required by constructor
 * @param tracingDataSource tracing ataSource
 * @return a bean of tracing configuration
 */
@Bean
@ConditionalOnBean(DataSource.class)
@ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")
public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
    /**
     * dataSource 是業務資料庫
     * tracingDataSource 是job資料庫
     * 當配置elasticjob.tracing.type = RDB時,如果單獨配置job資料庫是,預設使用job資料庫作為job運行軌跡的記錄
     * 但這邊同時業務資料庫和job追蹤資料庫同時註入是,mybatis-plus 結合@Table 使用的時候,很有可能找不到正確對應的數據源
     */
    DataSource ds = tracingDataSource;
    if (ds == null) {
        ds = dataSource;
    }
    return new TracingConfiguration<>("RDB", ds);
}

 

通過elasticjob.tracing.type=RDB的配置開啟事件追蹤功能,這邊job的事件追蹤數據源可以和業務數據源配置不一樣。

 

第三件事:解析所有job配置文件

ElasticJobBootstrapConfiguration.class

 

public void createJobBootstrapBeans() {
    //獲取job配置
    ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
    //獲取單利註冊對象
    SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
    //獲取註入zookeeper 客戶端
    CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
    //獲取job事件追蹤
    TracingConfiguration<?> tracingConfig = getTracingConfiguration();
    //構造JobBootstraps
    constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
}

重要的是constructJobBootstraps 這個方法,來看下

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
                                    final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
    //遍歷配置的每一個job
    for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
        ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
        //校驗 job class 和 type 都為空拋異常
        Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
                        || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
        //校驗 job class 和 type 都有 報相互排斥
        Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
                        || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "[elasticJobClass] and [elasticJobType] are mutually exclusive.");


        if (null != jobConfigurationProperties.getElasticJobClass()) {
            //通過class 註入job
            registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
            //通過type 註入job
            registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        }
    }
}

Job 有兩種類型的註入,第一種是是class,配置成job的全路徑信息註入   再來看看registerClassedJob 方法里的內容
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
                                final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
    //獲取job配置
    JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
    //配置job事件追蹤
    jobExtraConfigurations(jobConfig, tracingConfig);
    //獲取job類型
    ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
    //沒有配置cron表達式 就初始化為OneOffJobBootstrap對象,一次性任務
    if (Strings.isNullOrEmpty(jobConfig.getCron())) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
        singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
    } else {
        //有配置cron表達式 就初始化為ScheduleJobBootstrap對象,定時任務
        //設置bean name
        String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
        //註入ScheduleJobBootstrap對象為單利對象
        singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
    }
}

Class 類型註入的job有兩種類型 1.ScheduleJobBootstrap:定時任務類型的job。 2.OneOffJobBootstrap:一定次job類型。   看下定義的new ScheduleJobBootstrap 方法
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
    Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
    this.regCenter = regCenter;
    //獲取job監聽器
    Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
    // 集成所有操作zookeeper 節點的services,job 監聽器
    setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
    //獲取當前job名稱
    String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
    //zookeeper節點 {namespace}/{jobclassname}/config 放置job配置信息
    this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
    // 集成所有操作zookeeper 節點的services
    schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
    jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
    //檢驗job配置
    validateJobProperties();
    //定義job執行器
    jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
    //監聽器里註入GuaranteeService
    setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
    //創建定時任務,開始執行
    jobScheduleController = createJobScheduleController();
}

 

看下createJobScheduleController

private JobScheduleController createJobScheduleController() {
    JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
    //註冊job
    JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
    //註冊器開始運行
    registerStartUpInfo();
    return result;
}

看下registerStartUpInfo方法

public void registerStartUpInfo(final boolean enabled) {
    //開始所有的監聽器
    listenerManager.startAllListeners();
    //選舉leader /{namespace}/leader/election/instance 放置選舉出來的伺服器
    leaderService.electLeader();
    //{namespace}/{ipservers} 設置enable處理
    serverService.persistOnline(enabled);
    //臨時節點   /{namespave}/instances 放置運行服務實例信息
    instanceService.persistOnline();
    //開啟一個非同步服務
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

這裡實行的操作: 1.開啟所有監聽器處理 2.leader選舉 3.持久化節點數據 4.開啟非同步服務   第四步:4.識別job類型,在zookeeper節點上處理job節點數據,運行定時任務job.  
@Override
public void run(final String... args) {
    log.info("Starting ElasticJob Bootstrap.");
    applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
    log.info("ElasticJob Bootstrap started.");
}

獲取到所有的定時任務job(ScheduleJobBootstrap類型),執行schedule方法,底層實際使用quartz框架運行定時任務。          
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 背景 項目中有一個系統使用的微前端,主站使用是vue2實現的,使用的是vue-router3.x。子應用有使用vue3實現的,使用的為vue-router4.x。 該子應用中的頁面A有通過操作按鈕觸發跳轉到其他子應用頁面B的需求,此時使用的是vue-router4.x的編程式導航API。 當通過點擊 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 工作中遇到一個需求是根據日曆查看某一天/某一周/某一月的睡眠報告,但是找了好多日曆組件都不是很符合需求,只好自己手寫一個日曆組件,順便記錄一下。 先看看UI給的設計圖和,需求是有數據的日期做標記,可以查看某一周/某一月的數據,周數據不用自 ...
  • 在JavaScript中,Map、Set、WeakMap和WeakSet是四個不同的數據結構,它們都有不同的特點和用途: 1. Map :Map是一種鍵值對的集合,其中的鍵和值可以是任意類型的。與對象類似,它們可以通過鍵來訪問值。不同之處在於,Map可以使用任意類型作為鍵,而對象只能使用字元串或Sy ...
  • 門面模式和適配器模式是代碼級的設計模式,而防腐層本質是一種防禦型策略,在更高的層級對系統進行解耦。通常情況下,防腐層包含一系列的門面類和適配器類以及一些轉換器類。 ...
  • 微服務網關作為微服務後端服務的統一入口,它可以統籌管理後端服務,主要分為數據平面和控制平面: 數據平面主要功能是接入用戶的HTTP請求和微服務被拆分後的聚合。使用微服務網關統一對外暴露後端服務的API和契約,路由和過濾功能正是網關的核心能力模塊。另外,微服務網關可以實現攔截機制和專註跨橫切麵的功能... ...
  • 淄博燒烤爆紅出了圈,當你坐在八大局的燒烤攤,面前是火爐、烤串、小餅和蘸料,音樂響起,啤酒倒滿,燒烤靈魂的party即將開場的時候,你系統中的Scheduler(調試器),也自動根據設定的Trigger(觸發器),從容優雅的啟動了一系列的Job(後臺定時任務)。工作一切早有安排,又何須費心勞神呢?因為 ...
  • 監聽事件 ​ 監聽事件機制由事件源,事件和事件監聽器三類對象組成,事件源一般就是activity中的UI控制項。 下麵引用別人整理的圖來更加形象的表達這些關係。 ​ 事件監聽機制的意義就是讓事件源的行為委托給事件監聽器,讓監聽器控制事件的發生。 ​ 1.實現監聽事件的方法 [ ] 通過內部類實現 [ ...
  • 本文首發於公眾號:Hunter後端 原文鏈接:Django筆記三十一之全局異常處理 這一篇筆記介紹 Django 的全局異常處理。 當我們在處理一個 request 請求時,會儘可能的對介面數據的格式,內部調用的函數做一些異常處理,但可能還是會有一些意想不到的漏網之魚,造成程式的異常導致不能正常運行 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...