自定義 ThreadPoolExecutor 處理線程運行時異常

来源:https://www.cnblogs.com/hapjin/archive/2019/03/29/10617702.html
-Advertisement-
Play Games

自定義 ThreadPoolExecutor 處理線程運行時異常 最近看完了 "ElasticSearch線程池模塊" 的源碼,感觸頗深,然後也自不量力地借鑒ES的 EsThreadPoolExecutor 重新造了一把輪子,對線程池的理解又加深了一些。在繼承 ThreadPoolExecutor實 ...


自定義 ThreadPoolExecutor 處理線程運行時異常

最近看完了ElasticSearch線程池模塊的源碼,感觸頗深,然後也自不量力地借鑒ES的 EsThreadPoolExecutor 重新造了一把輪子(源碼在這裡),對線程池的理解又加深了一些。在繼承 ThreadPoolExecutor實現自定義的線程池時,ES先重寫了Runnable介面,提供了更靈活的任務運行過程中出現異常處理邏輯。簡而言之,它採用回調機制實現了線程在運行過程中拋出未受檢異常的統一處理邏輯,非常優美。實在忍不住把源碼copy下來:

/**
 * An extension to runnable.
 */
public abstract class AbstractRunnable implements Runnable {

    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {
        return false;
    }

    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {
        // nothing by default
    }

    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);

    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {
        onFailure(e);
    }

    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;
}
  1. 統一的任務執行入口方法doRun(),由各個子類實現doRun()執行具體的業務邏輯

  2. try-catch中統一處理線程執行任務過程中拋出的異常,由onFailure()處理

  3. 任務執行完成(不管是正常結束還是運行過程中拋出了異常),統一由onAfter()處理

  4. isForceExecution方法,用來支持任務在提交給線程池被拒絕了,強制執行。當然了,這需要線程池的任務隊列提供相關的支持。我也是受這種方式的啟發,實現了一個線程在執行任務過程中拋出未受檢異常時,先判斷該任務是否允許強制執行isForceExecution,然後再重新提交任務運行的線程池

此外,ES內置了好幾個預設實現的線程池,比如 EsThreadPoolExecutor 、QueueResizingEsThreadPoolExecutor 和 PrioritizedEsThreadPoolExecutor。

  1. QueueResizingEsThreadPoolExecutor

    在創建線程池時會指定一個任務隊列(BlockingQueue),平常都是直接用 LinkedBlockingQueue,它是一個無界隊列,當然也可以在構造方法中指定隊列的長度。但是,ES中幾乎不用 LinkedBlockingQueue 作為任務隊列,而是使用 LinkedTransferQueue ,但是 LinkedTransferQueue 又是一個無界隊列,於是ES又基於LinkedTransferQueue 封裝了一個任務隊列,類名稱為 ResizableBlockingQueue,它能夠限制任務隊列的長度。

    那麼問題來了,對於一個線程池,任務隊列設置為多長合適呢?

    答案就是Little's Law。在QueueResizingEsThreadPoolExecutor 線程池中重寫了afterExecute()方法,裡面統計了每個任務的運行時間、等待時間(入隊列到執行)。所以,你想知道如何統計一個任務的運行時間嗎?你想統計線程池一共提交了多少個任務,所有任務的運行時間嗎?看看QueueResizingEsThreadPoolExecutor 源碼就明白了。

    另外再提一個問題,為什麼ES用 LinkedTransferQueue 作為任務隊列而不用 LinkedBlockingQueue 呢?

    我想:很重要的一個原因是LinkedBlockingQueue 是基於重量級的鎖實現的入隊操作,而LinkedTransferQueue 是基於CAS原子指令實現的入隊操作。那麼這就是synchronized內置鎖和CAS原子指令之間的一些差異了,你懂得。

  2. PrioritizedEsThreadPoolExecutor

    優先順序任務的線程池,任務提交給線程池後是在任務隊列裡面排隊,FIFO模式。而這個線程池則允許任務定義一個優先順序,優先順序高的任務先執行。

  3. EsThreadPoolExecutor

    這個線程池最接近經常見到的ThreadPoolExecutor,不過,它實現了一些拒絕處理邏輯,提交任務若被拒絕(會拋出EsRejectedExecutionException異常),則進行相關處理

        @Override
        public void execute(final Runnable command) {
            doExecute(wrapRunnable(command));
        }
    
        protected void doExecute(final Runnable command) {
            try {
                super.execute(command);
            } catch (EsRejectedExecutionException ex) {
                if (command instanceof AbstractRunnable) {
                    // If we are an abstract runnable we can handle the rejection
                    // directly and don't need to rethrow it.
                    try {
                        ((AbstractRunnable) command).onRejection(ex);
                    } finally {
                        ((AbstractRunnable) command).onAfter();
    
                    }
                } else {
                    throw ex;
                }
            }
        }

講完了ES中常用的三個線程池實現,還想結合JDK源碼,記錄一下線程在執行任務過程中拋出運行時異常,是如何處理的。我覺得有二種方式(或者說有2個地方)來處理運行時異常。一種方式是:java.util.concurrent.ThreadPoolExecutor#afterExecute方法,另一種方式是:java.lang.Thread.UncaughtExceptionHandler#uncaughtException

  1. afterExecute

    看ThreadPoolExecutor#afterExecute(Runnable r, Throwable t) 的源碼註釋:

    Method invoked upon completion of execution of the given Runnable.This method is invoked by the thread that executed the task. If non-null, the Throwable is the uncaught RuntimeException or Error that caused execution to terminate abruptly.

    提交給線程池的任務,執行完(不管是正常結束,還是執行過程中出現了異常)後都會自動調用afterExecute()方法。如果執行過程中出現了異常,那麼Throwable t 就不為null,並且導致執行終止(terminate abruptly.)。

    This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.afterExecute at the beginning of this method.

    預設的afterExecute(Runnable r, Throwable t) 方法是一個空實現,什麼也沒有。因此,在繼承ThreadPoolExecutor實現自己的線程池時,如果重寫該方法,則要記住:先調用 super.afterExecute

    比如說這樣乾:

     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         super.afterExecute(r, t);
         if (t != null) {
             //出現了異常
             if (r instanceof AbstractRunnable && ((AbstractRunnable)r).isForceExecution()) {
                 //AbstractRunnable 設置為強制執行時重新拉起任務
                 execute(r);
                 logger.error("AbstractRunnable task run time error:{}, restarted", t.getMessage());
             }
         }
     }

    看,重寫afterExecute方法,當 Throwable 不為null時,表明線程執行任務過程中出現了異常,這時就重新提交任務。

    有個時候,在實現 Kafka 消費者線程的時候(while true迴圈),經常因為解析消息出錯導致線程拋出異常,就會導致 Kafka消費者線程掛掉,這樣就永久丟失了一個消費者了。而通過這種方式,當消費者線程掛了時,可重新拉起一個新任務。

  2. uncaughtException

    創建 ThreadPoolExecutor時,要傳入ThreadFactory 作為參數,在而創建ThreadFactory 對象時,就可以設置線程的異常處理器java.lang.Thread.UncaughtExceptionHandler。

    在用Google Guava包的時候,一般這麼乾:

    //先 new  Thread.UncaughtExceptionHandler對象 exceptionHandler
    private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread_name-%d").setUncaughtExceptionHandler(exceptionHandler).build();
    

    線上程執行任務過程中,如果拋出了異常,就會由JVM調用 Thread.UncaughtExceptionHandler 中實現的異常處理邏輯。看Thread.UncaughtExceptionHandler的JDK源碼註釋:

    Interface for handlers invoked when a Thread abruptly. terminates due to an uncaught exception.

    When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using getUncaughtExceptionHandler and will invoke the handler's uncaughtException method, passing the thread and the exception as arguments.

    其大意就是:如果線程在執行Runnable任務過程因為 uncaught exception 而終止了,那麼 JVM 就會調用getUncaughtExceptionHandler 方法查找是否設置了異常處理器,如果設置了,那就就會調用異常處理器的java.lang.Thread.UncaughtExceptionHandler#uncaughtException方法,這樣我們就可以在這個方法裡面定義異常處理邏輯了。

總結

ES的ThreadPool 模塊是學習線程池的非常好的一個示例,實踐出真知。它告訴你如何自定義線程池(用什麼任務隊列?cpu核數、任務隊列長度等參數如何配置?)。在實現自定義任務隊列過程中,也進一步理解了CAS操作的原理,如何巧妙地使用CAS?是失敗重試呢?還是直接返回?

  • 線程在執行Runnable任務過程中拋出了異常如何處理?這裡提到了Thread.UncaughtExceptionHandler#uncaughtException 和 ThreadPoolExecutor#afterExecute。前者是由JVM自動調用的,後者則是在每個任務執行結束後都會被調用。

  • Thread.UncaughtExceptionHandler#uncaughtException 和 RejectedExecutionHandler#rejectedExecution 是不同的。RejectedExecutionHandler 用來處理任務在提交的時候,被線程池拒絕了,該怎麼辦的問題,預設是AbortPolicy,即:直接丟棄。

  • 等下次有時間,好好地寫一篇分析ElasticSearch6.3.2的線程池模塊。:)
  • Lucene 源碼 org.apache.lucene.util.CloseableThreadLocal 解決了使用JDK ThreadLocal 時 JAVA對象 長期駐留記憶體得不到及時清除的問題,也值得好好分析一番 :)
    原文:https://www.cnblogs.com/hapjin/p/10617702.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 用python轉換jupyter成pdf 關註公眾號「**Python專欄**」,後臺回覆「**zsxq06**」,獲取本文全套源碼! ...
  • Java環境配置 1.JDK下載安裝 進入oracle官網選擇jdk下載(快捷鏈接:https://www.oracle.com/technetwork/java/javase/downloads/index.html) 選擇jdk版本(記住jdk的安裝路徑) 2.環境變數配置 右鍵我的電腦——點擊 ...
  • 問題如圖 需要添加一個導入 這樣就不會報錯了 ...
  • Maven生成jar包時, 怎樣把項目中依賴的jar包也包括進去? 這裡介紹2種方式: 使用 shade插件, 和使用 assembly插件. 另外擴展Maven安裝本地jar包到本地倉庫的方法、手動生成jar包的方法、Linux下運行jar包的幾種方式. ...
  • Hello Flask Flask簡介 Flask是一個使用Python編寫的輕量級Web應用框架。基於Werkzeug WSGI工具箱和Jinja2 模板引擎。Flask使用BSD授權。Flask被稱為“microframework”,因為它使用簡單的核心,用extension增加其他功能。Fla ...
  • 1、引入mybatis.jar mysql-connector-java-5.1.47.jar 2、新增資料庫資源文件datasource.properties 3、新增mybatis.xml 4、新增UserDomain.java 5、新增UserMapper.java 6、新增UserMappe ...
  • 什麼是動態代理呢?就是在java的運行過程中,動態的生成的代理類。(為了更熟悉的瞭解動態代理,你必須先熟悉代理模式,可點擊設計模式之代理模式 閱讀)我們知道java屬於解釋型語言,是在運行過程中,尋找位元組碼文件從而實現類載入的。但是位元組碼文件並不需要一定是硬碟中的class文件,也可以是來自網路、數 ...
  • @Data註解主要是幫助解決Setter 和 Getter以及 toString這種重覆的無腦工作 加入@Data註解可以直接幫助我們添加實體類相應的Setter 和 Getter以及 toString 需要自己添加jar包或pom依賴 然後我們可以直接調用相應的Setter 和 Getter以及 ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...