java 線程池預設提供了幾種拒絕策略: 這幾個策略都實現了RejectedExecutionHandler,拿DiscardOldestPolicy來說,查看源碼: 核心代碼只有2行: e.getQueue().poll() 從列表裡彈出1個(最早的)任務,以便讓隊列空出1個位置 e.execut ...
java 線程池預設提供了幾種拒絕策略:
這幾個策略都實現了RejectedExecutionHandler,拿DiscardOldestPolicy來說,查看源碼:
核心代碼只有2行:
- e.getQueue().poll() 從列表裡彈出1個(最早的)任務,以便讓隊列空出1個位置
- e.execute(r) 新任務放入隊列執行
從這段代碼來看,如果有任務被丟棄(即:從隊列里彈出了),不會有任何報錯,也沒有日誌可查,實際使用中不太方便監控這種情況。
我們可以參考這段源碼,自定義策略:
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class CustomDiscardPolicy implements RejectedExecutionHandler { //額外傳入1個名稱,方便打日誌或埋點監控時,定位問題 private String factoryName = ""; public CustomDiscardPolicy(String factoryName) { this.factoryName = factoryName; } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { Runnable poll = e.getQueue().poll(); //這裡可以加一些自己的處理(比如:埋點監控) System.err.println("[" + this.factoryName + "]task will be discard:" + poll); e.execute(r); } } }
當然,這裡出於演示目的,只打了一行錯誤信息,實際應用中大家可以埋點發到kafka之類(以便後續做實時監控預警)。
測試一下:
@Test public void testThreadPool() throws InterruptedException { final ThreadFactory DEMO_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("demo-POOL-%d").build(); final ExecutorService DEMO_POOL = new ThreadPoolExecutor(1, 2, 300L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(5), DEMO_THREAD_FACTORY, new CustomDiscardPolicy("demo-POOL")); for (int i = 0; i < 10; i++) { DEMO_POOL.submit(() -> { try { System.out.println(Thread.currentThread().getId() + " ready!"); //假設線程幹活,需要一段時間 Thread.sleep(500); System.out.println("\t" + Thread.currentThread().getId() + " done!"); } catch (Exception e) { } }); } //等一會兒,讓線程池都跑完,再結束main Thread.sleep(10000); }
提交了10個任務,線程池必然飽和(10>2+5),會丟棄一些早期任務,輸出如下:
從輸出看,丟了3個任務,符合預期。
作者:菩提樹下的楊過出處:http://yjmyzz.cnblogs.com
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。