Java - ThreadPoolExecutor源碼分析 1. 為什麼要自定義線程池 首先ThreadPoolExecutor中,一共提供了7個參數,每個參數都是非常核心的屬性,線上程池去執行任務時,每個參數都有決定性的作用。 但是如果直接採用JDK提供的方式去構建,可見設置的核心參數最多就兩個, ...
Java - ThreadPoolExecutor源碼分析
1. 為什麼要自定義線程池
首先ThreadPoolExecutor中,一共提供了7個參數,每個參數都是非常核心的屬性,線上程池去執行任務時,每個參數都有決定性的作用。
但是如果直接採用JDK提供的方式去構建,可見設置的核心參數最多就兩個,這樣就會導致對線程池的控制粒度很粗。所以在阿裡規範中也推薦自己創建自定義線程池。
自定義構建線程池,可以細粒度的控制線程池,去管理記憶體的屬性,並且針對一些參數的設置可能更好的在後期排查問題。
ThreadPoolExecutor 七大核心參數:
public ThreadPoolExecutor(int corePoolSize, // 核心工作線程(當前任務執行結束後,不會銷毀) int maximumPoolSize, // 最大工作線程(代表當前線程池中一共可以有多少工作線程) long keepAliveTime, // 非核心工作線程在阻塞隊列位置等待時間 TimeUnit unit, // 非核心工作線程在阻塞隊列位置等待時間的單位 BlockingQueue<Runnable> workQueue, // 任務在沒有核心工作線程處理時,任務先到阻塞隊列中 ThreadFactory threadFactory, // 構建線程的線程工廠,可以自定義thread信息 RejectedExecutionHandler handler) // 當線程池無法處理處理任務時,執行拒絕策略
2.ThreadPoolExecutor應用
JDK提供的幾種拒絕策略:
- AbortPolicy: 當前拒絕策略會在無法執行任務時,直接拋出一個異常
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
- CallerRunsPolicy: 當前拒絕策略會在無法執行任務時,將任務交給調用者處理
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
- DiscardPolicy:當前拒絕策略會在無法執行任務時,直接將任務丟棄
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
- DiscardOldestPolicy: 當前拒絕策略會在無法執行任務時,將阻塞隊列中最早的任務丟棄,將當前任務再次交接線程池處理
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
- 當然也可以自定義拒絕策略,根據自己業務修改實現邏輯, 只需實現 RejectedExecutionHandler 類中的 rejectedExecution 方法。
3. ThreadPoolExecutor的核心屬性
線程池的核心屬性就是ctl,它會基於ctl拿到線程池的狀態以及工作線程個數。
// 當前線程的核心屬性 // 當前的ctl其實就是一個int類型的數值,內部是基於AtomicInteger套了一層,進行運算時,是原子操作 // ctl表示線程池的兩個核心屬性 // 線程池的狀態: ctl的高3位,表示線程池狀態 // 工作線程的數量: ctl的低29位,表示工作線程的個數 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Integer.SIZE: 獲取Integer的bit位個數 // 聲明一個常量: COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; // CAPACITY就是當前工作線程能記錄的工作線程的最大個數 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 線程池狀態表示 // 當前五個狀態中,只有RUNNING狀態表示線程池正常,可以正常接收任務處理 // 111: 代表RUNNING狀態,RUNNING可以處理任務,並且處理阻塞隊列中的任務 private static final int RUNNING = -1 << COUNT_BITS; // 000: 代表SHUTDOWN狀態,不會接收新任務,正在處理的任務正常進行,阻塞隊列中的任務也會處理完 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001: 代表STOP狀態,不在接收新任務,正在處理的任務會被中斷,阻塞隊列中的任務不在處理 private static final int STOP = 1 << COUNT_BITS; // 010: 代表TIDYING狀態,這個狀態是SHUTDOWN或者STOP轉換過來的,代表線程池馬上關閉,過度狀態 private static final int TIDYING = 2 << COUNT_BITS; // 011: 代表TERMINATED狀態,這個狀態是TIDYING轉換過來的,轉換過來需要執行terminated方法 private static final int TERMINATED = 3 << COUNT_BITS; // 下麵方法是幫助運算ctl值的,需要傳入ctl // 基於&運算的特點,保證獲取ctl的高3位的值 private static int runStateOf(int c) { return c & ~CAPACITY; } // 基於&運算的特點,保證獲取ctl的低29位的值 private static int workerCountOf(int c) { return c & CAPACITY; } // runStateOf 和 workerCountOf 方法都是拆包 // 基於|運算的特點,對線程池狀態rs和線程個數wc進行封裝 private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池的轉換方式:
ThreadPoolExecutor中的execute方法
execute方法是提交任務到線程池的核心方法。
execute源碼解析:
// 提交執行任務 // command 就是提交過來的任務 public void execute(Runnable command) { // 提交的任務不能為null 健壯性判斷 if (command == null) throw new NullPointerException(); // 獲取核心屬性ctl值,用於後續判斷 int c = ctl.get(); // 如果工作線程個數小於核心線程數 // 滿足要求,添加核心工作線程 if (workerCountOf(c) < corePoolSize) { // addWorker(任務,是否是核心線程) ture: 核心線程,false:非核心線程 // addWorker返回true: 代表添加工作線程成功 // addWorker返回false: 代表添加工作線程失敗 // addWorker中會基於線程池狀態,以及工作線程個數判斷,查看能否添加工作線程 if (addWorker(command, true)) // 工作線程構建出來了,任務也交給command去處理了 return; // 說明線程池狀態或者是工作線程個數發生了變化,導致添加失敗,需要重新獲取ctl值 c = ctl.get(); } // 添加核心工作線程失敗後 // 先判斷線程池狀態是否是RUNNING狀態,如果是正常基於阻塞隊列的offer方法,將任務添加到阻塞隊列 if (isRunning(c) && workQueue.offer(command)) { // 如果任務添加到阻塞隊列成功,走if內部 // 如果任務在丟到阻塞隊列之前,線程池狀態發生改變了 // 重新獲取ctl int recheck = ctl.get(); // 如果線程池的狀態不是RUNNING狀態,將任務從阻塞隊列中移除 if (! isRunning(recheck) && remove(command)) // 並且直接拒絕策略 reject(command); // 在這,說明阻塞隊列有我剛放進去的任務 // 查看一下工作線程數是不是0個 // 如果工作線程為0個,需要添加一個非核心工作線程去處理阻塞隊列中的任務 // 發生這種情況有兩種: // 1. 構建線程池時,核心線程數可以是0個 // 2. 即使有核心線程,可以設置核心線程也允許超時,設置allowCoreThreadTimeOut(預設false)為ture else if (workerCountOf(recheck) == 0) // 為了避免阻塞隊列中的任務堆積,添加一個非核心線程去處理 addWorker(null, false); } // 任務添加到阻塞隊列失敗 // 構建一個非核心工作線程 // 如果添加非核心工作線程成功,直接完成 else if (!addWorker(command, false)) // 添加失敗,執行拒絕策略 reject(command); }
execute方法流程圖:
ThreadPoolExecutor中的addWorker方法
addWorker方法中主要分為兩大塊:
- 第一塊:校驗線程池的狀態以及工作線程個數
- 第二塊:添加工作線程並且啟動工作線程
// 校驗和添加啟動工作線程 private boolean addWorker(Runnable firstTask, boolean core) { // =======================第一塊==================== // 外層for迴圈在校驗線程池的狀態 // 內層for迴圈是在校驗工作線程的個數 // retry是給外層for迴圈添加的一個標記,為了方便在內層for迴圈跳出到外層for迴圈 retry: for (;;) { // 獲取ctl int c = ctl.get(); // 拿到ctl的高3位的值 int rs = runStateOf(c); // =====================線程池狀態判斷========================== // 如果線程池狀態是SHUTDOWN,並且此時阻塞隊列有任務,工作線程為0,則添加一個工作線程去處理阻塞隊列的任務 // 判斷線程池的狀態是否大於等於SHUTDOWN,滿足則說明線程不是RUNNING狀態 if (rs >= SHUTDOWN && // 如果這三個條件都滿足,就代表是要添加非核心工作線程去處理阻塞隊列中任務 // 如果三個條件有一個不滿足,返回false配合!,就不需要添加 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 不需要添加工作線程 return false; for (;;) { // ===============工作線程個數判斷========================== // 基於ctl拿到低29位的值,代表工作線程的個數 int wc = workerCountOf(c); // 如果工作線程個數大於最大工作線程數CAPACITY值,就不可以添加,返回false if (wc >= CAPACITY || // 基於core來判斷添加的是否是核心工作線程 // 如果是核心: 基於corePoolSize去判斷 // 如果是非核心: 基於maximumPoolSize去判斷 wc >= (core ? corePoolSize : maximumPoolSize)) // 代表不能添加,工作線程個數不滿足要求 return false; // 針對ctl + 1 , 採用CAS操作 if (compareAndIncrementWorkerCount(c)) // CAS成功後,直接退出外層迴圈,代表可以進行執行添加工作線程操作了 break retry; // 重新獲取一次ctl值 c = ctl.get(); // Re-read ctl // 判斷重新獲取的ctl中,線程池的狀態與之前是否有區別 // 如果狀態發生改變,需要重新去判斷線程狀態 if (runStateOf(c) != rs) // 重新進入外層for迴圈 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // =======================第二塊==================== // 添加工作線程以及啟動工作線程 // 聲明瞭三個變數 // workerStarted 表示工作線程啟動狀態,預設false boolean workerStarted = false; // workerAdded 表示工作線程添加狀態,預設false boolean workerAdded = false; // w 表示工作線程 Worker w = null; try { // 構建工作線程,並且將任務傳遞進去 w = new Worker(firstTask); // 獲取worker中的thread對象 final Thread t = w.thread; // 判斷thread是否不為null, 在new worker時,內部會通過給予的threadFactory去構造thread交給worker // 一般如果為null,代表ThreadFactory有問題 if (t != null) { // 加鎖,保證使用worker成員變數以及對largestPoolSize賦值時,保證線程安全 final ReentrantLock mainLock = this.mainLock; // 加鎖, 因為後續要操作HashSet是線程不安全的 mainLock.lock(); try { // 再次獲取線程池狀態 int rs = runStateOf(ctl.get()); // 再次判斷 // 如果滿足 rs < SHUTDOWN,說明線程池是RUNNING狀態,可以繼續執行 // 如果線程池狀態為SHUTDOWN,並且firstTask為null,添加非核心線程處理阻塞隊列任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 進入這裡,就可以添加工作線程 // 將threadFactory構建線程後,不能直接啟動線程,如果啟動則拋出異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); /** * 包含池中所有工作線程的集合。只有在獲得mainLock時才能訪問 * private final HashSet<Worker> workers = new HashSet<Worker>(); * 將創建好的worker放入工作線程集合中 */ workers.add(w); // 獲取工作線程集合的大小,拿到工作線程個數 int s = workers.size(); // largestPoolSize在記錄最大線程個數的記錄 // 如果當前工作線程個數,大於最大線程個數的記錄,就賦值 if (s > largestPoolSize) largestPoolSize = s; // 設置工作線程添加成功 workerAdded = true; } } finally { // 釋放鎖 mainLock.unlock(); } // 如果工作線程添加成功 if (workerAdded) { // 直接啟動worker中的線程 t.start(); // 設置啟動工作線程成功 workerStarted = true; } } } finally { // 做補償的操作,如果工作線程啟動失敗,將這個添加失敗的工作線程處理掉 if (! workerStarted) // 從工作線程的集合中移除掉 addWorkerFailed(w); } // 返回工作線程釋放啟動成功 return workerStarted; }
線程池為啥要構建空任務的非核心線程?
- 第一個:在 execute 方法中有個判斷工作線程是否為0,是就添加一個空任務的非核心線程;
else if (workerCountOf(recheck) == 0) addWorker(null, false);
- 第二個:在工作線程 Worker 啟動後,工作線程會運行 runWorker 方法,該方法中有個操作,當工作線程結束之後會執行 processWorkerExit 方法,在這個方法內部又有添加一個空任務的非核心線程;
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
綜合上訴,Java它有一個這樣的場景:
在初始化線程池的時候可能設置線程池的核心線程數為0 或者 設置allowCoreThreadTimeOut(預設false)為ture導致核心線程超時釋放,存在沒有核心線程的情況。
當我們把任務添加到阻塞隊列之後,沒有工作線程導致阻塞隊列任務堆積,直到後續有新任務加入才會去創建工作線程。
/** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. * 如果為false(預設值),核心線程即使在空閑時也會保持活動狀態。 * 如果為true,核心線程將使用keepAliveTime超時等待工作 */ private volatile boolean allowCoreThreadTimeOut;
綜上所述,因此線程池需要構建空任務的非核心線程去處理這種情況。
線程池使用完為什麼必須執行shutdown方法或者shutdownNow方法?
- 第一點:線上程池 addWorker 方法中我們可以看到,
線程池啟動線程也是基於 Thread 對象去進行的一個 start 方法啟動的,像這種它會占用jvm的棧,
所以屬於GC Roots 通過垃圾回收的可達性分析演算法,這種線程就不能被回收,會一直占用jvm的資源,
因此不能及時的調用 shutdown 或者 shutdownNow 方法,就可能造成記憶體泄漏問題!!!
- 第二點:線程池啟動對象是基於你 Worker 對象內部的 Thread 對象啟動的,
當執行Thread對象的 start方法時,它會執行 Worker對象的 run 方法,
該方法中的runWorker 方法傳入的是 this 就是當前的Worker對象,
就會導致啟動的線程還指向了Worker對象,這個Worker對象是不能回收的,
又因為Worker對象屬於線程池的內部類,
導致整個 ThreadPoolExecutor 線程池對象也不會被回收!!!
綜上所述,當使用完線程池對象後,沒有及時的調用關閉方法,會導致堆記憶體資源消耗很嚴重,最後會導致記憶體泄漏問題!
線程池的核心參數該如何設置?
主要的難點在於任務類型無法控制,比如:
cpu密集型: cpu不斷的處理任務,大量的計算等操作。
IO密集型: 不需要cpu一直調度,大多數時間都是等待結果的,如:調用第三方服務等待網路響應、等待IO響應、查詢資料庫等待資料庫響應等等。
混合型:上面兩種都會有。
大多數情況都需自己去測試,調試!沒有絕對固定的一個公式。可以參考:
N thread = N cpu * U cpu * ( 1 + W / C )
線程數 = cpu的個數 * cpu的利用率 * ( 1 + 等待時間 / 計算時間 ) 註:W/C 是程式運行時 等待時間和計算時間的比值
1 * 100% * (1 + 50% / 50% )= 2
公式只是給定一個調試的初始值,需要自己後續測試調試!
以上可能還有不足,僅供參考!!!