java實現任務調度

来源:https://www.cnblogs.com/skyice/archive/2018/09/23/9691736.html
-Advertisement-
Play Games

最近的一個小項目是做一個簡單的數據倉庫,需要將其他資料庫的數據抽取出來,並通過而出抽取成頁面需要的數據,以空間換時間的方式,讓後端報表查詢更快。 因為在抽取的過程中,有一定的先後順序,需要做一個任務調度器,某一優先順序的會先執行,然後會進入下一個優先順序的隊列任務中。 先定義了一個Map的集合,key是 ...


最近的一個小項目是做一個簡單的數據倉庫,需要將其他資料庫的數據抽取出來,並通過而出抽取成頁面需要的數據,以空間換時間的方式,讓後端報表查詢更快。
因為在抽取的過程中,有一定的先後順序,需要做一個任務調度器,某一優先順序的會先執行,然後會進入下一個優先順序的隊列任務中。
先定義了一個Map的集合,key是優先順序,value是任務的集合,某一個優先順序內的任務是併發執行的,而不同優先順序是串列執行的,前一個優先順序執行完之後,後面的才會執行。

ConcurrentHashMap<Integer/* 優先順序. */, List<BaseTask>/* 任務集合. */> tasks = new ConcurrentHashMap<>();

這個調度管理有一個演進的過程,我先說第一個,這個是比較好理解的。
第一個版本:
首先對tasks集合中的key進行一個排序,我定義的是數字越小就有限執行,則進行遍歷key值,並取出某個優先順序的任務隊列,執行任務隊列的任務。任務的執行交給線程池去執行,在遍歷內部,需要不斷的檢查這個隊列中的任務是否都執行了,沒有則一直等待否則進入到下個隊列,任務執行的時候可能會拋出異常,但是不管任務是否異常,都將任務狀態設置已執行。
下麵是其核心代碼:

public void run() {
    //對key值進行排序
    Enumeration<Integer> keys = tasks.keys();
    List<Integer> prioritys = new ArrayList<>();
    while (keys.hasMoreElements()) {
      prioritys.add(keys.nextElement());
    }
    Collections.sort(prioritys);//升序
    //對key進行遍歷,執行某個某個優先順序的任務隊列
    for (Integer priority : prioritys) {
      List<BaseTask> taskList = tasks.get(priority);
      if (taskList.isEmpty()) {
        continue;
      }
      logger.info("execute priority {} task ", taskList.get(0).priority);
      for (BaseTask task : taskList) {
        executor.execute(() -> {
          try {
            task.doTask();
          } catch (Exception e) {
            e.printStackTrace();
          }
        });//線程中執行任務
      }
      while (true) {//等待所有線程都執行完成之後執行下一個任務隊列
        boolean finish = true;
        for (BaseTask t : taskList) {
          if (!t.finish) {
            finish = false;
          }
        }
        if (finish) {//當前任務都執行完畢
          break;
        }
        Misc.sleep(1000);//Thread.sleep(1000)
      }
      Misc.sleep(1000);
    }
  }

關鍵代碼很好理解,在任務執行之前,需要對所有任務都初始化,初始化的時候給出每個任務的優先順序和任務名稱,任務抽象類如下:

public abstract class BaseTask {
  public String taskName;//任務名稱
  public Integer priority; //優先順序
  public boolean finish; //任務完成?
  /**
   * 執行的任務
   */
  public abstract void doTask(Date date) throws Exception;

第一個版本的思路很簡單。
第二個版本稍微有一點點複雜。這裡主要介紹該版本的內容,後續將代碼的鏈接附上。
程式是由SpringBoot搭建起來的,定時器是Spring內置的輕量級的Quartz,使用Aop方式攔截異常,使用註解的方式在任務初始化時設置任務的初始變數。使用EventBus解耦程式,其中程式簡單實現郵件發送功能(該功能還需要自己配置參數),以上這些至少需要簡單的瞭解一下。
程式的思路:在整個隊列執行過程中會有多個管道,某個隊列上的管道任務執行完成,可以直接進行到下一個隊列中執行,也設置了等待某一個隊列上的所有任務都執行完成才執行當前任務。在某個隊列任務中會標識某些任務是一隊的,其他的為另一隊,當這一隊任務執行完成,就可以到下一個隊列中去,不需要等待另一隊。
這裡會先初始化每個隊列的每個隊的條件,這個條件就是每個隊的任務數,執行完成減1,當為0時,就進入下一個隊列中。
分四個步驟進行完成:
1.bean的初始化
2.條件的設置
3.任務的執行
4.任務異常和任務執行完成之後通知檢查是否執行下一個隊列的任務

1.bean的初始化

1.創建註解類

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
public @interface TaskAnnotation {

  int priority() default 0;//優先順序
  String taskName() default "";//任務名稱
  TaskQueueEnum[] queueName() default {};//隊列名稱
}

2.實現BeanPostProcessor,該介面是中有兩個方法postProcessBeforeInitialization和postProcessAfterInitialization,分別是bean初始化之前和bean初始化之後做的事情。

    Annotation[] annotations = bean.getClass().getAnnotations();//獲取類上的註解
    if (ArrayUtils.isEmpty(annotations)) {//註解為空時直接返回(不能返回空,否則bean不會被載入)
        return bean;
    }
    for (Annotation annotation : annotations) {
        if (annotation.annotationType().equals(TaskAnnotation.class)) {
          TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//強轉
           try {
            Field[] fields = target.getClass().getFields();//需要通過反射將值進行修改,下麵的操作僅僅是對象的引用
            if (!ArrayUtils.isEmpty(fields)) {
              for (Field f : fields) {
                f.setAccessible(true);
                if (f.getName().equals("priority")) {
                  f.set(target, taskAnnotation.priority());
                }
            }
          }
     }
  }

上面需要註意的一點是需要通過反射的機制給bean設置值,不能直接調用bean的方式set值,否則bean的值是空的。
上面的代碼通過實現BeanPostProcessor後置處理器,處理任務上的註解,完成對任務的初始化的。

2.條件的初始化

創建條件類,提供初始化的方法。

public abstract class BaseTask {

  public int nextPriority;//子級節點的優先順序

  public String taskName;//任務名稱

  public Integer priority; //優先順序

  public String queueName;//隊列名稱

  public boolean finish; //任務完成?

  public boolean allExecute;

  /**
   * 執行的任務
   */
  public abstract void doTask(Date date) throws Exception;

    //任務完成之後,通過eventBus發送通知,是否需要執行下一個隊列
  public void notifyExecuteTaskMsg(EventBus eventBus, Date date) {
    EventNotifyExecuteTaskMsg msg = new EventNotifyExecuteTaskMsg();
    msg.setDate(date);
    msg.setNextPriority(nextPriority);
    msg.setQueueName(queueName);
    msg.setPriority(priority);
    msg.setTaskName(taskName);
    eventBus.post(msg);
  }
}

public class TaskExecuteCondition {

  private ConcurrentHashMap<String, AtomicInteger> executeMap = new ConcurrentHashMap<>();

  /**
   * 初始化,每個隊列進行分組,每個組的任務數量放入map集合中.
   */
  public void init(ConcurrentHashMap<Integer, List<BaseTask>> tasks) {
    Enumeration<Integer> keys = tasks.keys();
    List<Integer> prioritys = new ArrayList<>();
    while (keys.hasMoreElements()) {
      prioritys.add(keys.nextElement());
    }
    Collections.sort(prioritys);//升序
    for (Integer priority : prioritys) {
      List<BaseTask> list = tasks.get(priority);
      if (list.isEmpty()) {
        continue;
      }
      //對每個隊列進行分組
      Map<String, List<BaseTask>> collect = list.stream()
          .collect(Collectors.groupingBy(x -> x.queueName, Collectors.toList()));
      for (Entry<String, List<BaseTask>> entry : collect.entrySet()) {
        for (BaseTask task : entry.getValue()) {
          addCondition(task.priority, task.queueName);
        }
      }
    }
  }

  /**
   * 執行任務完成,條件減1
   */
  public boolean executeTask(Integer priority, String queueName) {
    String name = this.getQueue(priority, queueName);
    AtomicInteger count = executeMap.get(name);
    int sum = count.decrementAndGet();
    if (sum == 0) {
      return true;
    }
    return false;
  }

  /**
   * 對個某個隊列的條件
   */
  public int getCondition(Integer priority, String queueName) {
    String name = this.getQueue(priority, queueName);
    return executeMap.get(name).get();
  }

  private void addCondition(Integer priority, String queueName) {
    String name = this.getQueue(priority, queueName);
    AtomicInteger count = executeMap.get(name);
    if (count == null) {
      count = new AtomicInteger(0);
      executeMap.put(name, count);
    }
    count.incrementAndGet();
  }

  private void addCondition(Integer priority, String queueName, int sum) {
    String name = this.getQueue(priority, queueName);
    AtomicInteger count = executeMap.get(name);
    if (count == null) {
      count = new AtomicInteger(sum);
      executeMap.put(name, count);
    } else {
      count.set(sum);
    }
  }


  private String getQueue(Integer priority, String queueName) {
    return priority + queueName;
  }

  /**
   * 清除隊列
   */
  public void clear() {
    this.executeMap.clear();
  }
}

3.任務的執行

任務執行類提供run方法,執行第一個隊列,並提供獲取下一個隊列優先順序方法,執行某個隊列某個組的方法。

public class ScheduleTask {
  private static final Logger logger = LoggerFactory.getLogger(ScheduleTask.class);
  
  public ConcurrentHashMap<Integer/* 優先順序. */, List<BaseTask>/* 任務集合. */> tasks = new ConcurrentHashMap<>();

  @Autowired
  private ThreadPoolTaskExecutor executor;//線程池
    //任務會先執行第一隊列的任務.
  public void run(Date date) {
    Enumeration<Integer> keys = tasks.keys();
    List<Integer> prioritys = new ArrayList<>();
    while (keys.hasMoreElements()) {
      prioritys.add(keys.nextElement());
    }
    Collections.sort(prioritys);//升序
    Integer priority = prioritys.get(0);
    executeTask(priority, date);//執行第一行的任務.
  }
    //獲取下一個隊列的優先順序
  public Integer nextPriority(Integer priority) {
    Enumeration<Integer> keys = tasks.keys();
    List<Integer> prioritys = new ArrayList<>();
    while (keys.hasMoreElements()) {
      prioritys.add(keys.nextElement());
    }
    Collections.sort(prioritys);//升序
    for (Integer pri : prioritys) {
      if (priority < pri) {
        return pri;
      }
    }
    return null;//沒有下一個隊列
  }

  public void executeTask(Integer priority) {
    List<BaseTask> list = tasks.get(priority);
    if (list.isEmpty()) {
      return;
    }
    for (BaseTask task : list) {
      execute(task);
    }
  }
 //執行某個隊列的某個組
  public void executeTask(Integer priority, String queueName) {
    List<BaseTask> list = this.tasks.get(priority);
    list = list.stream().filter(task -> queueName.equals(task.queueName))
        .collect(Collectors.toList());
    if (list.isEmpty()) {
      return;
    }
    for (BaseTask task : list) {
      execute(task);
    }
  }

  public void execute(BaseTask task) {
    executor.execute(() -> {
      try {
        task.doTask(date);//
      } catch (Exception e) {//異常處理已經Aop攔截處理
      }
    });//線程中執行任務
  }

  /**
   * 增加任務
   */
  public void addTask(BaseTask task) {
    List<BaseTask> baseTasks = tasks.get(task.priority);
    if (baseTasks == null) {
      baseTasks = new ArrayList<>();
      List<BaseTask> putIfAbsent = tasks.putIfAbsent(task.priority, baseTasks);
      if (putIfAbsent != null) {
        baseTasks = putIfAbsent;
      }
    }
    baseTasks.add(task);
  }

  /**
   * 將任務結束標識重新設置
   */
  public void finishTask() {
    tasks.forEach((key, value) -> {
      for (BaseTask task : value) {
        task.finish = false;
      }
    });
  }
}

4.任務異常和任務執行完成之後通知檢查是否執行下一個隊列的任務

public class EventNotifyExecuteTaskListener {
  private static final Logger logger = LoggerFactory .getLogger(EventNotifyExecuteTaskListener.class);
  @Autowired
  private ScheduleTask scheduleTask;

  @Autowired
  private TaskExecuteCondition condition;

  @Subscribe
  public void executeTask(EventNotifyExecuteTaskMsg msg) {
  //當前隊列的某組內容是否都執行完成
    boolean success = condition.executeTask(msg.getPriority(), msg.getQueueName());
    if (success) {
      Integer nextPriority = scheduleTask.nextPriority(msg.getPriority());
      if (nextPriority != null) {
        scheduleTask.executeTask(nextPriority, msg.getQueueName(), msg.getDate());//執行下一個隊列
      } else {//執行完成,重置任務標識
        scheduleTask.finishTask();
        logger.info("CoreTask end!");
      }
    }
  }
}

整個思路介紹到這裡,那麼接下來是整個項目中出現的一些問題
1.BeanPostProcessor與Aop一起使用時,postProcessAfterInitialization調用之後獲取的bean分為不同的了,一個是jdk原生實體對象,一種是Aop註解下的類會被cglib代理,生成帶有尾碼的對象,如果通過這個對象時反射獲取類的註解,欄位和方法,就獲取不到,在代碼中,需要將其轉化一下,將cgLib代理之後的類轉化為不帶尾碼的對象。
2.postProcessAfterInitialization的參數bean不能直接設置值,就是如下:

 TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//強轉
 BaseTask baseTask = (BaseTask) bean;//強轉
 baseTask.priority = taskAnnotation.priority();

在使用對象時,其中對象的欄位時為空的,並需要通過反射的方式去設置欄位的值。
上面僅僅只是個人的想法,如果有更好的方式,或者有某些地方可以進行改進的,我們可以共同探討一下。

鏈接地址:https://github.com/wangice/task-scheduler
程式中使用了一個公共包:https://github.com/wangice/misc


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 具體效果圖如下: 主要用到的技術除了3D翻轉和定位 ,還用到了一個新的屬性 backface-visibility:visable|hidden; 該屬性主要是用來設定元素背面是否可見。 具體的步驟如下: 1、寫出頁面主體, 2、通過定位使兩張圖片疊加在一起 3、設置第一張圖片背面不可見 4、添加旋 ...
  • 問題描述: 在 iOS 系統中,用微信打開了 A 頁面的鏈接,然後由 A 頁面進入 B 頁面 在 B 頁面打開微信右上角菜單,使用“複製鏈接”功能 最後粘貼出來的鏈接是 A 頁面的鏈接 分析原因: 這個問題在微信 6.2 時代就已存在,GitHub 上有很多人到 weui 的主頁提 issue ht ...
  • 本博文配合 阮一峰 《ES6 標準入門(第3版)》一書進行簡要概述 ES6 中變數的解構賦值。 數組的解構賦值 基本用法 ES6 允許按照一定模式,從數組和對象中提取值,對變數進行賦值,這被稱為解構。 ES6 以前,為變數賦值,只能直接指定值。 ES6 允許寫成下麵的樣式。 該代碼表示,可以從數組中 ...
  • 這是在網上看到的一道面試題 嗯 考察的知識點挺多 其他的就不多說了 我用我的理解與解題方式來解答這道題 1.首先是變數提升 變數提升包括var 聲明的變數和fucntion 聲明 舉個例子 1.var a=4; 2.function test(){ console.log(456); }; 函數變數 ...
  • css清除瀏覽器預設樣式(代替 *{}) 將代碼放入 css 文件,使用 link 引入。 ...
  • String能變化嗎?和StringBuffer的區別是什麼? ...
  • java中單態模式或單例模式(Singleton)有什麼意義? ...
  • 5.object和Class的深入理解 屬性和方法 (視頻下載) (全部書籍) 抽象Abstract:【新手可忽略不影響繼續學習】 (視頻下載) (全部書籍)很多java 的書中都談到了抽象abstract的概念,到底什麼是抽象?馬克-to-win:抽取關鍵相關特性(屬性和方法)構成對象,用程式的方 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...