自定義 ThreadPoolExecutor 處理線程運行時異常

来源:https://www.cnblogs.com/hapjin/archive/2019/03/29/10617702.html
-Advertisement-
Play Games

自定義 ThreadPoolExecutor 處理線程運行時異常 最近看完了 "ElasticSearch線程池模塊" 的源碼,感觸頗深,然後也自不量力地借鑒ES的 EsThreadPoolExecutor 重新造了一把輪子,對線程池的理解又加深了一些。在繼承 ThreadPoolExecutor實 ...


自定義 ThreadPoolExecutor 處理線程運行時異常

最近看完了ElasticSearch線程池模塊的源碼,感觸頗深,然後也自不量力地借鑒ES的 EsThreadPoolExecutor 重新造了一把輪子(源碼在這裡),對線程池的理解又加深了一些。在繼承 ThreadPoolExecutor實現自定義的線程池時,ES先重寫了Runnable介面,提供了更靈活的任務運行過程中出現異常處理邏輯。簡而言之,它採用回調機制實現了線程在運行過程中拋出未受檢異常的統一處理邏輯,非常優美。實在忍不住把源碼copy下來:

/**
 * An extension to runnable.
 */
public abstract class AbstractRunnable implements Runnable {

    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {
        return false;
    }

    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {
        // nothing by default
    }

    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);

    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {
        onFailure(e);
    }

    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;
}
  1. 統一的任務執行入口方法doRun(),由各個子類實現doRun()執行具體的業務邏輯

  2. try-catch中統一處理線程執行任務過程中拋出的異常,由onFailure()處理

  3. 任務執行完成(不管是正常結束還是運行過程中拋出了異常),統一由onAfter()處理

  4. isForceExecution方法,用來支持任務在提交給線程池被拒絕了,強制執行。當然了,這需要線程池的任務隊列提供相關的支持。我也是受這種方式的啟發,實現了一個線程在執行任務過程中拋出未受檢異常時,先判斷該任務是否允許強制執行isForceExecution,然後再重新提交任務運行的線程池

此外,ES內置了好幾個預設實現的線程池,比如 EsThreadPoolExecutor 、QueueResizingEsThreadPoolExecutor 和 PrioritizedEsThreadPoolExecutor。

  1. QueueResizingEsThreadPoolExecutor

    在創建線程池時會指定一個任務隊列(BlockingQueue),平常都是直接用 LinkedBlockingQueue,它是一個無界隊列,當然也可以在構造方法中指定隊列的長度。但是,ES中幾乎不用 LinkedBlockingQueue 作為任務隊列,而是使用 LinkedTransferQueue ,但是 LinkedTransferQueue 又是一個無界隊列,於是ES又基於LinkedTransferQueue 封裝了一個任務隊列,類名稱為 ResizableBlockingQueue,它能夠限制任務隊列的長度。

    那麼問題來了,對於一個線程池,任務隊列設置為多長合適呢?

    答案就是Little's Law。在QueueResizingEsThreadPoolExecutor 線程池中重寫了afterExecute()方法,裡面統計了每個任務的運行時間、等待時間(入隊列到執行)。所以,你想知道如何統計一個任務的運行時間嗎?你想統計線程池一共提交了多少個任務,所有任務的運行時間嗎?看看QueueResizingEsThreadPoolExecutor 源碼就明白了。

    另外再提一個問題,為什麼ES用 LinkedTransferQueue 作為任務隊列而不用 LinkedBlockingQueue 呢?

    我想:很重要的一個原因是LinkedBlockingQueue 是基於重量級的鎖實現的入隊操作,而LinkedTransferQueue 是基於CAS原子指令實現的入隊操作。那麼這就是synchronized內置鎖和CAS原子指令之間的一些差異了,你懂得。

  2. PrioritizedEsThreadPoolExecutor

    優先順序任務的線程池,任務提交給線程池後是在任務隊列裡面排隊,FIFO模式。而這個線程池則允許任務定義一個優先順序,優先順序高的任務先執行。

  3. EsThreadPoolExecutor

    這個線程池最接近經常見到的ThreadPoolExecutor,不過,它實現了一些拒絕處理邏輯,提交任務若被拒絕(會拋出EsRejectedExecutionException異常),則進行相關處理

        @Override
        public void execute(final Runnable command) {
            doExecute(wrapRunnable(command));
        }
    
        protected void doExecute(final Runnable command) {
            try {
                super.execute(command);
            } catch (EsRejectedExecutionException ex) {
                if (command instanceof AbstractRunnable) {
                    // If we are an abstract runnable we can handle the rejection
                    // directly and don't need to rethrow it.
                    try {
                        ((AbstractRunnable) command).onRejection(ex);
                    } finally {
                        ((AbstractRunnable) command).onAfter();
    
                    }
                } else {
                    throw ex;
                }
            }
        }

講完了ES中常用的三個線程池實現,還想結合JDK源碼,記錄一下線程在執行任務過程中拋出運行時異常,是如何處理的。我覺得有二種方式(或者說有2個地方)來處理運行時異常。一種方式是:java.util.concurrent.ThreadPoolExecutor#afterExecute方法,另一種方式是:java.lang.Thread.UncaughtExceptionHandler#uncaughtException

  1. afterExecute

    看ThreadPoolExecutor#afterExecute(Runnable r, Throwable t) 的源碼註釋:

    Method invoked upon completion of execution of the given Runnable.This method is invoked by the thread that executed the task. If non-null, the Throwable is the uncaught RuntimeException or Error that caused execution to terminate abruptly.

    提交給線程池的任務,執行完(不管是正常結束,還是執行過程中出現了異常)後都會自動調用afterExecute()方法。如果執行過程中出現了異常,那麼Throwable t 就不為null,並且導致執行終止(terminate abruptly.)。

    This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.afterExecute at the beginning of this method.

    預設的afterExecute(Runnable r, Throwable t) 方法是一個空實現,什麼也沒有。因此,在繼承ThreadPoolExecutor實現自己的線程池時,如果重寫該方法,則要記住:先調用 super.afterExecute

    比如說這樣乾:

     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         super.afterExecute(r, t);
         if (t != null) {
             //出現了異常
             if (r instanceof AbstractRunnable && ((AbstractRunnable)r).isForceExecution()) {
                 //AbstractRunnable 設置為強制執行時重新拉起任務
                 execute(r);
                 logger.error("AbstractRunnable task run time error:{}, restarted", t.getMessage());
             }
         }
     }

    看,重寫afterExecute方法,當 Throwable 不為null時,表明線程執行任務過程中出現了異常,這時就重新提交任務。

    有個時候,在實現 Kafka 消費者線程的時候(while true迴圈),經常因為解析消息出錯導致線程拋出異常,就會導致 Kafka消費者線程掛掉,這樣就永久丟失了一個消費者了。而通過這種方式,當消費者線程掛了時,可重新拉起一個新任務。

  2. uncaughtException

    創建 ThreadPoolExecutor時,要傳入ThreadFactory 作為參數,在而創建ThreadFactory 對象時,就可以設置線程的異常處理器java.lang.Thread.UncaughtExceptionHandler。

    在用Google Guava包的時候,一般這麼乾:

    //先 new  Thread.UncaughtExceptionHandler對象 exceptionHandler
    private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread_name-%d").setUncaughtExceptionHandler(exceptionHandler).build();
    

    線上程執行任務過程中,如果拋出了異常,就會由JVM調用 Thread.UncaughtExceptionHandler 中實現的異常處理邏輯。看Thread.UncaughtExceptionHandler的JDK源碼註釋:

    Interface for handlers invoked when a Thread abruptly. terminates due to an uncaught exception.

    When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using getUncaughtExceptionHandler and will invoke the handler's uncaughtException method, passing the thread and the exception as arguments.

    其大意就是:如果線程在執行Runnable任務過程因為 uncaught exception 而終止了,那麼 JVM 就會調用getUncaughtExceptionHandler 方法查找是否設置了異常處理器,如果設置了,那就就會調用異常處理器的java.lang.Thread.UncaughtExceptionHandler#uncaughtException方法,這樣我們就可以在這個方法裡面定義異常處理邏輯了。

總結

ES的ThreadPool 模塊是學習線程池的非常好的一個示例,實踐出真知。它告訴你如何自定義線程池(用什麼任務隊列?cpu核數、任務隊列長度等參數如何配置?)。在實現自定義任務隊列過程中,也進一步理解了CAS操作的原理,如何巧妙地使用CAS?是失敗重試呢?還是直接返回?

  • 線程在執行Runnable任務過程中拋出了異常如何處理?這裡提到了Thread.UncaughtExceptionHandler#uncaughtException 和 ThreadPoolExecutor#afterExecute。前者是由JVM自動調用的,後者則是在每個任務執行結束後都會被調用。

  • Thread.UncaughtExceptionHandler#uncaughtException 和 RejectedExecutionHandler#rejectedExecution 是不同的。RejectedExecutionHandler 用來處理任務在提交的時候,被線程池拒絕了,該怎麼辦的問題,預設是AbortPolicy,即:直接丟棄。

  • 等下次有時間,好好地寫一篇分析ElasticSearch6.3.2的線程池模塊。:)
  • Lucene 源碼 org.apache.lucene.util.CloseableThreadLocal 解決了使用JDK ThreadLocal 時 JAVA對象 長期駐留記憶體得不到及時清除的問題,也值得好好分析一番 :)
    原文:https://www.cnblogs.com/hapjin/p/10617702.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 用python轉換jupyter成pdf 關註公眾號「**Python專欄**」,後臺回覆「**zsxq06**」,獲取本文全套源碼! ...
  • Java環境配置 1.JDK下載安裝 進入oracle官網選擇jdk下載(快捷鏈接:https://www.oracle.com/technetwork/java/javase/downloads/index.html) 選擇jdk版本(記住jdk的安裝路徑) 2.環境變數配置 右鍵我的電腦——點擊 ...
  • 問題如圖 需要添加一個導入 這樣就不會報錯了 ...
  • Maven生成jar包時, 怎樣把項目中依賴的jar包也包括進去? 這裡介紹2種方式: 使用 shade插件, 和使用 assembly插件. 另外擴展Maven安裝本地jar包到本地倉庫的方法、手動生成jar包的方法、Linux下運行jar包的幾種方式. ...
  • Hello Flask Flask簡介 Flask是一個使用Python編寫的輕量級Web應用框架。基於Werkzeug WSGI工具箱和Jinja2 模板引擎。Flask使用BSD授權。Flask被稱為“microframework”,因為它使用簡單的核心,用extension增加其他功能。Fla ...
  • 1、引入mybatis.jar mysql-connector-java-5.1.47.jar 2、新增資料庫資源文件datasource.properties 3、新增mybatis.xml 4、新增UserDomain.java 5、新增UserMapper.java 6、新增UserMappe ...
  • 什麼是動態代理呢?就是在java的運行過程中,動態的生成的代理類。(為了更熟悉的瞭解動態代理,你必須先熟悉代理模式,可點擊設計模式之代理模式 閱讀)我們知道java屬於解釋型語言,是在運行過程中,尋找位元組碼文件從而實現類載入的。但是位元組碼文件並不需要一定是硬碟中的class文件,也可以是來自網路、數 ...
  • @Data註解主要是幫助解決Setter 和 Getter以及 toString這種重覆的無腦工作 加入@Data註解可以直接幫助我們添加實體類相應的Setter 和 Getter以及 toString 需要自己添加jar包或pom依賴 然後我們可以直接調用相應的Setter 和 Getter以及 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...