自定義 ThreadPoolExecutor 處理線程運行時異常 最近看完了 "ElasticSearch線程池模塊" 的源碼,感觸頗深,然後也自不量力地借鑒ES的 EsThreadPoolExecutor 重新造了一把輪子,對線程池的理解又加深了一些。在繼承 ThreadPoolExecutor實 ...
自定義 ThreadPoolExecutor 處理線程運行時異常
最近看完了ElasticSearch線程池模塊的源碼,感觸頗深,然後也自不量力地借鑒ES的 EsThreadPoolExecutor 重新造了一把輪子(源碼在這裡),對線程池的理解又加深了一些。在繼承 ThreadPoolExecutor實現自定義的線程池時,ES先重寫了Runnable介面,提供了更靈活的任務運行過程中出現異常處理邏輯。簡而言之,它採用回調機制實現了線程在運行過程中拋出未受檢異常的統一處理邏輯,非常優美。實在忍不住把源碼copy下來:
/**
* An extension to runnable.
*/
public abstract class AbstractRunnable implements Runnable {
/**
* Should the runnable force its execution in case it gets rejected?
*/
public boolean isForceExecution() {
return false;
}
@Override
public final void run() {
try {
doRun();
} catch (Exception t) {
onFailure(t);
} finally {
onAfter();
}
}
/**
* This method is called in a finally block after successful execution
* or on a rejection.
*/
public void onAfter() {
// nothing by default
}
/**
* This method is invoked for all exception thrown by {@link #doRun()}
*/
public abstract void onFailure(Exception e);
/**
* This should be executed if the thread-pool executing this action rejected the execution.
* The default implementation forwards to {@link #onFailure(Exception)}
*/
public void onRejection(Exception e) {
onFailure(e);
}
/**
* This method has the same semantics as {@link Runnable#run()}
* @throws InterruptedException if the run method throws an InterruptedException
*/
protected abstract void doRun() throws Exception;
}
統一的任務執行入口方法doRun(),由各個子類實現doRun()執行具體的業務邏輯
try-catch中統一處理線程執行任務過程中拋出的異常,由onFailure()處理
任務執行完成(不管是正常結束還是運行過程中拋出了異常),統一由onAfter()處理
isForceExecution
方法,用來支持任務在提交給線程池被拒絕了,強制執行。當然了,這需要線程池的任務隊列提供相關的支持。我也是受這種方式的啟發,實現了一個線程在執行任務過程中拋出未受檢異常時,先判斷該任務是否允許強制執行isForceExecution,然後再重新提交任務運行的線程池。
此外,ES內置了好幾個預設實現的線程池,比如 EsThreadPoolExecutor 、QueueResizingEsThreadPoolExecutor 和 PrioritizedEsThreadPoolExecutor。
QueueResizingEsThreadPoolExecutor
在創建線程池時會指定一個任務隊列(BlockingQueue),平常都是直接用 LinkedBlockingQueue,它是一個無界隊列,當然也可以在構造方法中指定隊列的長度。但是,ES中幾乎不用 LinkedBlockingQueue 作為任務隊列,而是使用 LinkedTransferQueue ,但是 LinkedTransferQueue 又是一個無界隊列,於是ES又基於LinkedTransferQueue 封裝了一個任務隊列,類名稱為 ResizableBlockingQueue,它能夠限制任務隊列的長度。
那麼問題來了,對於一個線程池,任務隊列設置為多長合適呢?
答案就是Little's Law。在QueueResizingEsThreadPoolExecutor 線程池中重寫了afterExecute()方法,裡面統計了每個任務的運行時間、等待時間(入隊列到執行)。所以,你想知道如何統計一個任務的運行時間嗎?你想統計線程池一共提交了多少個任務,所有任務的運行時間嗎?看看QueueResizingEsThreadPoolExecutor 源碼就明白了。
另外再提一個問題,為什麼ES用 LinkedTransferQueue 作為任務隊列而不用 LinkedBlockingQueue 呢?
我想:很重要的一個原因是LinkedBlockingQueue 是基於重量級的鎖實現的入隊操作,而LinkedTransferQueue 是基於CAS原子指令實現的入隊操作。那麼這就是synchronized內置鎖和CAS原子指令之間的一些差異了,你懂得。
PrioritizedEsThreadPoolExecutor
優先順序任務的線程池,任務提交給線程池後是在任務隊列裡面排隊,FIFO模式。而這個線程池則允許任務定義一個優先順序,優先順序高的任務先執行。
EsThreadPoolExecutor
這個線程池最接近經常見到的ThreadPoolExecutor,不過,它實現了一些拒絕處理邏輯,提交任務若被拒絕(會拋出EsRejectedExecutionException異常),則進行相關處理
@Override public void execute(final Runnable command) { doExecute(wrapRunnable(command)); } protected void doExecute(final Runnable command) { try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } }
講完了ES中常用的三個線程池實現,還想結合JDK源碼,記錄一下線程在執行任務過程中拋出運行時異常,是如何處理的。我覺得有二種方式(或者說有2個地方)來處理運行時異常。一種方式是:java.util.concurrent.ThreadPoolExecutor#afterExecute方法,另一種方式是:java.lang.Thread.UncaughtExceptionHandler#uncaughtException
afterExecute
看ThreadPoolExecutor#afterExecute(Runnable r, Throwable t) 的源碼註釋:
Method invoked upon completion of execution of the given Runnable.This method is invoked by the thread that executed the task. If non-null, the Throwable is the uncaught RuntimeException or Error that caused execution to terminate abruptly.
提交給線程池的任務,執行完(不管是正常結束,還是執行過程中出現了異常)後都會自動調用afterExecute()方法。如果執行過程中出現了異常,那麼Throwable t 就不為null,並且導致執行終止(terminate abruptly.)。
This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.afterExecute at the beginning of this method.
預設的afterExecute(Runnable r, Throwable t) 方法是一個空實現,什麼也沒有。因此,在繼承ThreadPoolExecutor實現自己的線程池時,如果重寫該方法,則要記住:先調用
super.afterExecute
比如說這樣乾:
@Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { //出現了異常 if (r instanceof AbstractRunnable && ((AbstractRunnable)r).isForceExecution()) { //AbstractRunnable 設置為強制執行時重新拉起任務 execute(r); logger.error("AbstractRunnable task run time error:{}, restarted", t.getMessage()); } } }
看,重寫afterExecute方法,當 Throwable 不為null時,表明線程執行任務過程中出現了異常,這時就重新提交任務。
有個時候,在實現 Kafka 消費者線程的時候(while true迴圈),經常因為解析消息出錯導致線程拋出異常,就會導致 Kafka消費者線程掛掉,這樣就永久丟失了一個消費者了。而通過這種方式,當消費者線程掛了時,可重新拉起一個新任務。
uncaughtException
創建 ThreadPoolExecutor時,要傳入ThreadFactory 作為參數,在而創建ThreadFactory 對象時,就可以設置線程的異常處理器java.lang.Thread.UncaughtExceptionHandler。
在用Google Guava包的時候,一般這麼乾:
//先 new Thread.UncaughtExceptionHandler對象 exceptionHandler private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread_name-%d").setUncaughtExceptionHandler(exceptionHandler).build();
線上程執行任務過程中,如果拋出了異常,就會由JVM調用 Thread.UncaughtExceptionHandler 中實現的異常處理邏輯。看Thread.UncaughtExceptionHandler的JDK源碼註釋:
Interface for handlers invoked when a Thread abruptly. terminates due to an uncaught exception.
When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using getUncaughtExceptionHandler and will invoke the handler's uncaughtException method, passing the thread and the exception as arguments.
其大意就是:如果線程在執行Runnable任務過程因為 uncaught exception 而終止了,那麼 JVM 就會調用getUncaughtExceptionHandler 方法查找是否設置了異常處理器,如果設置了,那就就會調用異常處理器的java.lang.Thread.UncaughtExceptionHandler#uncaughtException方法,這樣我們就可以在這個方法裡面定義異常處理邏輯了。
總結
ES的ThreadPool 模塊是學習線程池的非常好的一個示例,實踐出真知。它告訴你如何自定義線程池(用什麼任務隊列?cpu核數、任務隊列長度等參數如何配置?)。在實現自定義任務隊列過程中,也進一步理解了CAS操作的原理,如何巧妙地使用CAS?是失敗重試呢?還是直接返回?
線程在執行Runnable任務過程中拋出了異常如何處理?這裡提到了Thread.UncaughtExceptionHandler#uncaughtException 和 ThreadPoolExecutor#afterExecute。前者是由JVM自動調用的,後者則是在每個任務執行結束後都會被調用。
Thread.UncaughtExceptionHandler#uncaughtException 和 RejectedExecutionHandler#rejectedExecution 是不同的。RejectedExecutionHandler 用來處理任務在提交的時候,被線程池拒絕了,該怎麼辦的問題,預設是AbortPolicy,即:直接丟棄。
- 等下次有時間,好好地寫一篇分析ElasticSearch6.3.2的線程池模塊。:)
Lucene 源碼 org.apache.lucene.util.CloseableThreadLocal 解決了使用JDK ThreadLocal 時 JAVA對象 長期駐留記憶體得不到及時清除的問題,也值得好好分析一番 :)
原文:https://www.cnblogs.com/hapjin/p/10617702.html