Java中的線程池使用及原理

来源:https://www.cnblogs.com/yuyiming/archive/2023/07/31/17592483.html
-Advertisement-
Play Games

## 開篇-為什麼要使用線程池? ​ Java 中的線程池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來 3 個好處。 ​ 第一:降低資源消耗。通過重覆利用已創建的線程降低線程創建和銷毀造成的消耗。 ​ 第二:提高響應速度。當任 ...


開篇-為什麼要使用線程池?

​ Java 中的線程池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來 3 個好處。

​ 第一:降低資源消耗。通過重覆利用已創建的線程降低線程創建和銷毀造成的消耗。

​ 第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。

​ 第三:提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。但是,要做到合理利用線程池,必須對其實現原理瞭如指掌。

1. 線程池的任務執行步驟

​ 當向線程池提交一個任務之後,線程池是如何處理這個任務的呢?處理流程圖如圖 1-1所示。

圖1-1 線程池的主要處理流程

​ 從圖中可以看出,當提交一個新任務到線程池時,線程池的處理流程如下。

​ 1.線程池判斷核心線程池裡的線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果核心線程池裡的線程都在執行任務,則進入下個流程。

​ 2.線程池判斷工作隊列是否已經滿。如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程。

​ 3.線程池判斷線程池的線程是否都處於工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

ThreadPoolExecutor 執行 execute()方法的示意圖,如圖 1-2 所示。

圖1-2 ThreadPoolExecutor執行示意圖
​ `ThreadPoolExecutor `執行 `execute` 方法分下麵 4 種情況。
  • 如果當前運行的線程少於 corePoolSize,則創建新線程來執行任務(註意,執行這一步驟需要獲取全局鎖)。

  • 如果運行的線程等於或多於 corePoolSize,則將任務加入 BlockingQueue

  • 如果無法將任務加入 BlockingQueue(隊列已滿),則創建新的線程來處理任務(註意,執行這一步驟需要獲取全局鎖)。

  • 如果創建新線程將使當前運行的線程超出 maximumPoolSize,任務將被拒絕,並調用 RejectedExecutionHandler.rejectedExecution()方法。

    ThreadPoolExecutor 採取上述步驟的總體設計思路,是為了在執行 execute()方法時,儘可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在 ThreadPoolExecutor 完成預熱之後(當前運行的線程數大於等於 corePoolSize),幾乎所有的 execute()方法調用都是執行步驟 2,而步驟 2 不需要獲取全局鎖。

2. 線程池的使用

2.1 線程池的創建

通過ThreadPoolExecutor可以創建一個線程池。

ThreadPoolExecutor(int corePoolSize,
                   int maximumPoolSize,
                   long keepAliveTime,
                   TimeUnit unit,
                   BlockingQueue<Runnable> workQueue,
                   ThreadFactory threadFactory,
                   RejectedExecutionHandler handler)

創建一個線程池時需要輸入幾個參數,如下。

  1. corePoolSize(線程池的基本大小,也可以稱之為核心線程數大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有基本線程。

  2. maximumPoolSize(線程池最大數量):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會創建新的線程執行任務。值得註意的是,如果使用了無界的任務隊列,這個參數就沒有什麼效果了。

  3. keepAliveTime(線程活動保持時間):線程池的工作線程空閑後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高線程的利用率。預設情況下該參數針對的是非核心線程,如果將參數allowCoreThreadTimeOut設置為true,那麼核心線程也會受這個參數影響

  4. TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。

  5. workQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。具體可以參考Java阻塞隊列。

  6. threadFactory:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。

  7. RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時拋出異常。在 JDK 1.5 中 Java 線程池框架提供了以下 4 種策略。

    AbortPolicy:直接拋出異常。

    CallerRunsPolicy:只用調用者所線上程來運行任務。

    DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。

    DiscardPolicy:不處理,丟棄掉。

    當然,也可以根據應用場景需要來實現 RejectedExecutionHandler 介面自定義策略。如記錄日誌或持久化存儲不能處理的任務。

2.2 向線程池中提交任務

​ 可以使用兩個方法向線程池提交任務,分別為 execute()和 submit()方法。

​ execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知 execute()方法輸入的任務是一個 Runnable 類的實例。

executorService.execute(()-> Thread.currentThread().getName());

​ submit()方法用於提交需要返回值的任務。線程池會返回一個 future 類型的對象,通過這個 future 對象可以判斷任務是否執行成功,並且可以通過 future 的 get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用 get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後立即返回,這時候有可能任務沒有執行完。

Future<String> submit = executorService.submit(() -> Thread.currentThread().getName());
System.out.println(submit.get());

2.3 關閉線程池

​ 可以通過調用線程池的 shutdownshutdownNow 方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然後逐個調用線程的 interrupt 方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow 首先將線程池的狀態設置成 STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而 shutdown 只是將線程池的狀態設置成 SHUTDOWN 狀態,然後中斷所有沒有正在執行任務的線程。

​ 只要調用了這兩個關閉方法中的任意一個,isShutdown 方法就會返回 true。當所有的任務都已關閉後,才表示線程池關閉成功,這時調用 isTerminaed 方法會返回 true。至於應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調shutdown 方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。

2.4 合理地配置線程池

​ 要想合理地配置線程池,就必須首先分析任務特性,可以從以下幾個角度來分析。

​ • 任務的性質:CPU 密集型任務、IO 密集型任務和混合型任務。

​ • 任務的優先順序:高、中和低。

​ • 任務的執行時間:長、中和短。

​ • 任務的依賴性:是否依賴其他系統資源,如資料庫連接。

​ 性質不同的任務可以用不同規模的線程池分開處理。CPU 密集型任務應配置儘可能小的線程,如配置 N+1(其中N是CPU合適)個線程的線程池。由於 IO 密集型任務線程並不是一直在執行任務,則應配置儘可能多的線程,如 2*N(其中N是CPU合適)。混合型的任務,如果可以拆分,將其拆分成一個 CPU 密集型任務和一個 IO 密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於串列執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過 Runtime.getRuntime().availableProcessors()方法獲得當前設備的 CPU 個數。

​ 優先順序不同的任務可以使用優先順序隊列 PriorityBlockingQueue 來處理。它可以讓優先順序高的任務先執行。

​ 執行時間不同的任務可以交給不同規模的線程池來處理,或者可以使用優先順序隊列,讓執行時間短的任務先執行。

​ 依賴資料庫連接池的任務,因為線程提交 SQL 後需要等待資料庫返回結果,等待的時間越長,則 CPU 空閑時間就越長,那麼線程數應該設置得越大,這樣才能更好地利用CPU。

建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。有一次,我們系統里後臺任務線程池的隊列和線程池全滿了,不斷拋出拋棄任務的異常,通過排查發現是資料庫出現了問題,導致執行 SQL 變得非常緩慢,因為後臺任務線程池裡的任務全是需要向資料庫查詢和插入數據的,所以導致線程池裡的工作線程全部阻塞,任務積壓線上程池裡。如果當時我們設置成無界隊列,那麼線程池的隊列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題。當然,我們的系統所有的任務是用單獨的伺服器部署的,我們使用不同規模的線程池完成不同類型的任務,但是出現這樣問題時也會影響到其他任務。

2.5 線程池的監控

​ 如果在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,可以根據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線程池的時候可以使用以下屬性。

​ • taskCount:線程池需要執行的任務數量。

​ • completedTaskCount:線程池在運行過程中已完成的任務數量,小於或等於taskCount

​ • largestPoolSize:線程池裡曾經創建過的最大線程數量。通過這個數據可以知道線程池是否曾經滿過。如該數值等於線程池的最大大小,則表示線程池曾經滿過。

​ • getPoolSize:線程池的線程數量。如果線程池不銷毀的話,線程池裡的線程不會自動銷毀,所以這個大小隻增不減。

​ • getActiveCount:獲取活動的線程數。

​ 通過擴展線程池進行監控。可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecuteafterExecute terminated 方法,也可以在任務執行前、執行後和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。

3.線程池的生命周期

圖1-3 線程池的生命周期

線程池的生命周期包括以下幾個狀態:

  1. 初始狀態(NEW):線程池被創建後處於初始狀態。此時線程池沒有包含任何線程,也沒有開始執行任務。

  2. 運行狀態(RUNNING):通過調用線程池的 execute()submit() 方法,線程池開始接受任務並創建線程執行。線程池可以動態地調整線程數量來適應任務的需求。

  3. 關閉狀態(SHUTDOWN):當調用線程池的 shutdown() 方法後,線程池進入關閉狀態。此時線程池不會再接受新的任務提交,但會繼續處理已經提交的任務直到完成。處於關閉狀態的線程池仍然可以調用 execute() 方法來提交任務,但會拋出 RejectedExecutionException

  4. 停止狀態(STOP):通過調用線程池的 shutdownNow() 方法可以使線程池進入停止狀態。此時線程池會立即停止,取消所有正在執行的任務,並且丟棄所有等待執行的任務。

  5. 整理狀態(TIDYING):當線程池處於STOP狀態或者SHUTDOWN後,並且所有任務都已經完成,線程池會進入整理狀態。在整理狀態中,線程池會清理已終止的工作線程。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。

  6. 終止狀態(TERMINATED):當線程池完成整理操作後,最終進入終止狀態。此時線程池徹底終止,不再接受任務和執行任務。

註意,線程池的狀態可以通過isShutdown()isTerminated() 方法進行查詢,以確定線程池當前所處的狀態。

4.代碼分析線程池的運行原理

4.1 線程池控制狀態ctl

ctl 是線程池源碼中常常用到的一個變數,它的主要作用是記錄線程池的生命周期狀態和當前工作的線程數。它是一個原子整型變數。ctl是一個32位的整數,高3位用於表示線程池的運行狀態,低29位用於表示線程池中的線程數量。具體的結構如下所示:

31-29   |   線程池運行狀態(用來保存線程池的狀態 RUNNING,SHUTDOWN,STOP,STOP,STOP)
28-0    |   線程池中線程數量

ctlThreadPoolExecutor中的聲明和初始化

源碼:java.util.concurrent.ThreadPoolExecutor#ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  1. ctl (線程池控制狀態)是原子整型的,這意味這對它進行的操作具有原子性。
  2. 如此一來,作為ctl組成部分的 runState (線程池生命周期狀態)和 workerCount (工作線程數) 也將同時具有原子性。
  3. ThreadPoolExecutor 使用 ctlOf 方法來將 runState workerCount 兩個變數(都是整型)打包成一個 ctl 變數。

4.1.1 工具人常量COUNT_BITS 和 CAPACITY

源碼:java.util.concurrent.ThreadPoolExecutor
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  1. Integer.SIZE 為整型的位數32位 ,COUNT_BITS位32 - 3 = 29。
  2. (1 << COUNT_BITS) - 1 的計算過程如下:
int 類型的1用二進位表示為:00000000 00000000 00000000 00000001
1 << COUNT_BITS,其中COUNT_BITS為29,1 << 29,代表將數字1向左移動29位。結果為:00100000 00000000 00000000 00000000
將上一步計算出的結果減1,最終CAPACITY用二進位表示為:00011111 11111111 11111111 11111111
  1. 因此在接下來的代碼中, COUNT_BITS 就用來表示分隔runStateworkerCount 的位數。
  2. 而CAPACITY 則作為取變數runStateworkerCount 的工具。

4.1.2 線程池的生命周期常量

源碼:java.util.concurrent.ThreadPoolExecutor
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

代碼中的五個變數分別對應著線程池中的五個生命周期,他們的二進位表示如下:

RUNNING    二進位為 11100000 00000000 00000000 00000000
SHUTDOWN   二進位為 00000000 00000000 00000000 00000000
STOP       二進位為 00100000 00000000 00000000 00000000
TIDYING    二進位為 01000000 00000000 00000000 00000000
TERMINATED 二進位為 01100000 00000000 00000000 00000000

通過這裡可以看出為什麼將COUNT_BITS 設置為32 - 3,而不是將4或者其它的數。目的是將32位的高三位用來表示線程池的狀態,後29位用來表示線程池的最大容量。

4.1.3 方法runStateOfworkerCountOf,ctlOf

源碼:java.util.concurrent.ThreadPoolExecutor
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • runStateOf方法用來獲取當前線程池的狀態,~CAPACITY就是11100000 00000000 00000000 00000000。再通過ctl與其進行&運算就可以獲取高三位的值,獲取線程池的狀態。
  • workerCountOf方法通過將ctl與CAPACITY&運算可以計算出線程池中的工作線程數。
  • ctlOf方法runStateworkerCount 進行 | (按位或)操作來得到 ctl 變數,就是因為 runState 的高 3 位為有效信息,而 workerCount 的低 29 位為有效信息,合起來正好得到一個含 32 位有效信息的整型變數。

4.2 源碼分析線程池的執行流程

4.2.1 ThreadPoolExecutor.execute方法

execute用於向線程池中提交任務

方法源碼如下:

代碼清單4-1 java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	/*
	 * Proceed in 3 steps:
	 *
	 * 1. If fewer than corePoolSize threads are running, try to
	 * start a new thread with the given command as its first
	 * task.  The call to addWorker atomically checks runState and
	 * workerCount, and so prevents false alarms that would add
	 * threads when it shouldn't, by returning false.
	 *
	 * 2. If a task can be successfully queued, then we still need
	 * to double-check whether we should have added a thread
	 * (because existing ones died since last checking) or that
	 * the pool shut down since entry into this method. So we
	 * recheck state and if necessary roll back the enqueuing if
	 * stopped, or start a new thread if there are none.
	 *
	 * 3. If we cannot queue task, then we try to add a new
	 * thread.  If it fails, we know we are shut down or saturated
	 * and so reject the task.
	 */
    // 獲取當前線程池的控制狀態ctl。
	int c = ctl.get();
    // workerCountOf(c)獲取工作線程數,判斷工作線程數是否小於核心線程數
	if (workerCountOf(c) < corePoolSize) {
        // 工作線程數小於核心線程數,調用addWorker(command, true) 添加核心線程並且執行任務,放回true直接結束方法。addWorker方法見4.2.2中的代碼清單4-2
		if (addWorker(command, true))
			return;
        // addWorker(command, true) 調用失敗,更新ctl的臨時變數c,重新獲取線程池控制狀態.
		c = ctl.get();
	}
	if (isRunning(c) && workQueue.offer(command)) {
        // 如果線程池是Running狀態,並且往阻塞隊列中添加任務完成。進入這裡
        // 考慮併發帶來的影響,可能往同步隊列中添加元素之前,線程池被調用了shutDown,或者shutDownNow等方法想關閉線程池。這裡再一次的確定線程池的狀態。如果還是Running,那代表線程池的狀態沒變,一直是Running。
		int recheck = ctl.get();
        // 線程池的狀態變動了,調用remove方法從隊列中移除任務。
		if (! isRunning(recheck) && remove(command))
            // 拒絕任務
			reject(command);
        // 走到這裡,代表1.往同步隊列中添加元素成功,並且2.線程池的狀態一直是Running狀態。
		else if (workerCountOf(recheck) == 0)
            // 到了這裡,阻塞隊列中有任務了,但是工作線程數為0,這種情況一般是臨界點,舉個例子,線程池中創建時,corePoolSize為 1,maximumPoolSize為 1,阻塞隊列使用無界隊列,再調用此excute方法之前,線程池一直是有個線程A在執行任務的,此時調用完workQueue.offer(command)往隊列中添加元素之後,突然,執行任務的線程A突然遇到了一個空指針異常,線程A走終止邏輯。這是線程池中就需要一個新的線程來繼續執行阻塞隊列中的任務了。線程A終止的時候會調用processWorkerExit方法,該方法中也有addWorker(null, false);這個代碼片段。
			addWorker(null, false);
	}
 	// 走到這裡說明,線程池中的工作線程總數已經大於等於corePoolSize;線程池不是RUNNING狀態,或者說線程池是RUNNING狀態但無法繼續往隊列中添加元素了,這時候就需要創建非核心線程去執行任務了。
	else if (!addWorker(command, false))
        // 創建非核心任務執行
		reject(command);
}

4.2.2 addWorker 方法

addWorker被用於向線程池添加工作線程,該方法的用於將線程池的工作線程數的變數加1,然後創建一個線程並且運行。

代碼清單4-2 java.util.concurrent.ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
    // for迴圈,這一層的功能用於將線程池的工作線程數變數workerCountOf加1
	retry:
	for (;;) {
        // 獲取線程池控制狀態ctl
		int c = ctl.get();
        // 獲取線程池的狀態
		int rs = runStateOf(c);

		// Check if queue empty only if necessary.
        // rs >= SHUTDOWN 的狀態有SHUTDOWN,STOP,TIDYING,TERMINATED 代表線程池不在接收新的任務了
        // ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 該方法要返回true,裡面的3個判斷條件至少有一個為false,rs == SHUTDOWN 判斷線程狀態為SHUTDOWN,firstTask == null 代表創建工作線程時指定的任務為null,這種情況是線程池主動調用addWorker方法才會出現的,比如代碼清單4-1 execute方法 代碼裡面的代碼片段addWorker(null, false),接著是 ! workQueue.isEmpty() 判斷工作隊列是否為空。如果上述條件判斷中的任何條件為真,就表示不能滿足添加工作線程的條件,會返回 false。這意味著在rs >= SHUTDOWN,只有當線程池狀態為 SHUTDOWN、沒有待執行的任務並且工作隊列不為空時,才能添加工作線程,添加線程去執行同步隊列中剩下的任務。
		if (rs >= SHUTDOWN &&
			! (rs == SHUTDOWN &&
			   firstTask == null &&
			   ! workQueue.isEmpty()))
			return false;

		for (;;) {
            // 獲取當前線程池的工作線程數
			int wc = workerCountOf(c);
            // wc >= CAPACITY 返回true代表線程池中的線程已經>=2^29-1(CAPACITY 設定最大值為2^29-1,CAPACITY用來表示線程池工作線程的數量),wc >= (core ? corePoolSize : maximumPoolSize) 返回true代表當前要添加類型的線程數已經滿了。
			if (wc >= CAPACITY ||
				wc >= (core ? corePoolSize : maximumPoolSize))
				return false;
            // 將工作線程數加1
			if (compareAndIncrementWorkerCount(c))
                // 成功就跳出外層迴圈retry
				break retry;
            // CAS失敗重新獲取ctl的值
			c = ctl.get();  // Re-read ctl
            // 檢查重新讀取的runStateOf(c)是否與之前的狀態rs不同,如果不同則繼續外層迴圈retry
			if (runStateOf(c) != rs)
				continue retry;
			// else CAS failed due to workerCount change; retry inner loop
		}
	}

    // 下麵的代碼用於往線程池中添加工作線程。
    
    // workerStarted 工作線程啟動狀態位,true代表啟動成功。
	boolean workerStarted = false;
    // workerAdded 工作線程添加標誌位,true代表添加成功。
	boolean workerAdded = false;
	Worker w = null;
	try {
        // 創建一個Worker,詳細見4.2.3 代碼清單4-3
		w = new Worker(firstTask);
        // 獲得工作線程
		final Thread t = w.thread;
		if (t != null) {
            // 加鎖,防止以下操作併發
			final ReentrantLock mainLock = this.mainLock;
			mainLock.lock();
			try {
				// Recheck while holding lock.
				// Back out on ThreadFactory failure or if
				// shut down before lock acquired.
				int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN 如果線程池的狀態小於SHUTDOWN,這裡可以取到的狀態枚舉有RUNNING,rs == SHUTDOWN && firstTask == null,這段返回true說明線程池的狀態雖然是SHUTDOWN狀態,但需要處理阻塞隊列中剩下的任務。
				if (rs < SHUTDOWN ||
					(rs == SHUTDOWN && firstTask == null)) {
					if (t.isAlive()) // precheck that t is startable
                        // 如果線程t是活動的拋出異常。這邊還沒有啟動線程t,線程t就已經活動了,說明有問題。
						throw new IllegalThreadStateException();
                    // 將worker放入workers,workers是一個HashSet,用來存放所有的工作線程Worker。
					workers.add(w);
					int s = workers.size();
                    // largestPoolSize,曾經出現過的最大線程數,如果s > largestPoolSize,重新賦值,更新線程池峰值
					if (s > largestPoolSize)
						largestPoolSize = s;
                    // 工作線程添加成功。
					workerAdded = true;
				}
			} finally {
				mainLock.unlock();
			}
			if (workerAdded) {
                // 開啟工作線程。工作線程執行邏輯見代碼4.2.3 代碼清單4-3
				t.start();
                // 工作線程開啟成功
				workerStarted = true;
			}
		}
	} finally {
		if (! workerStarted)
            // 工作線程啟動失敗,見如下的代碼片段1
			addWorkerFailed(w);
	}
	return workerStarted;
}

// 代碼片段1 java.util.concurrent.ThreadPoolExecutor#addWorkerFailed
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 從工作線程w從工作線程集合中移除
        if (w != null)
            workers.remove(w);
        // 線程工作workerCount數量減1    
        decrementWorkerCount();
        // 基於狀態判斷嘗試終結線程池,下麵的小結會分析該方法
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

4.2.3 runWorker方法

addWorker 方法 完成了工作線程創建和啟動。addWorker方法中的t.start()方法調用完成後會調用Worker的run方法,然後調用runWorker方法來進行工作。

代碼清單4-3 java.util.concurrent.ThreadPoolExecutor.Worker
Worker(Runnable firstTask) {
    // 設置同步狀態為 -1,禁止線程中斷,直到runWorker()方法執行,結合java.util.concurrent.ThreadPoolExecutor.Worker#interruptIfStarted 方法可以知道原因,interruptIfStarted放在代碼清單 4-3。
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // Worker 實現了Runnable介面,將Worker自身傳入到了newThread中,到時候線程啟動時調用Worker.run()。
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
	runWorker(this);
}

# java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
    // 獲取當前工作線程要執行的任務
	Runnable task = w.firstTask;
    // 將firstTask置為null,表示任務被取出
	w.firstTask = null;
    // 因為初始化Worker的時候將線程標誌設置為了-1,這裡把state更新為0,允許線程中斷
	w.unlock(); // allow interrupts
    // 記錄線程是否因為用戶異常終結,預設是true
	boolean completedAbruptly = true;
	try {
        // 一個迴圈,當任務非空,或者通過getTask()方法可以從同步隊列中獲取任務時進入迴圈,如果while命中後半段當前線程會處於阻塞或者超時阻塞狀態。關於getTask方法見4.2.4 代碼清單4-4.
		while (task != null || (task = getTask()) != null) {
            // 可能會疑問這一步為什麼要上鎖,這塊根本不存在併發啊!!!其實是線程池在執行shutdown等方法時會調用interruptIdleWorkers方法來中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態;獲取不了鎖說明線程正在執行任務,否則就是處於等待任務或者空閑狀態,可以被中斷。
			w.lock();
			// If pool is stopping, ensure thread is interrupted;
			// if not, ensure thread is not interrupted.  This
			// requires a recheck in second case to deal with
			// shutdownNow race while clearing interrupt
            // 這個條件判斷工作線程是否需要中斷。如果線程池處於STOP,TIDYING,TERMINATED狀態,或者當前調用該方法的線程已經被中斷並且線程池處於STOP,TIDYING,TERMINATED狀態,且當前工作線程未被中斷(如果isInterrupted返回true,代表已被中斷,那就沒必要再調用wt.interrupt()中斷一次了),則中斷當前工作線程,這裡的中斷僅僅是將中斷標誌位設為true了,具體的中斷邏輯還需要用戶實現。
			if ((runStateAtLeast(ctl.get(), STOP) ||
				 (Thread.interrupted() &&
				  runStateAtLeast(ctl.get(), STOP))) &&
				!wt.isInterrupted())
				wt.interrupt();
			try {
                // 鉤子函數,執行任務前的準備工作,開發者可以去重寫他的實現。
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
					task.run();
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
                    // 鉤子函數,任務執行完畢時調用,開發者可以去重新他的實現
					afterExecute(task, thrown);
				}
			} finally {
                // 將臨時表量task置為null
				task = null;
                // 累加Worker完成的任務數
				w.completedTasks++;
                // 解鎖
				w.unlock();
			}
		}
        // 走到這裡說明getTask()返回為null,線程正常退出,移除的話就直接調用下麵的finally了
		completedAbruptly = false;
	} finally {
        // 處理線程退出,completedAbruptly為true說明工作線程非正常退出,false表示正常退出。processWorkerExit見4.2.5 代碼清單4-5
		processWorkerExit(w, completedAbruptly);
	}
}

void interruptIfStarted() {
	Thread t;
	if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
		try {
			t.interrupt();
		} catch (SecurityException ignore) {
		}
	}
}

4.2.4 getTask方法

工作線程嘗試從隊列中獲取任務

代碼清單4-4 java.util.concurrent.ThreadPoolExecutor#getTask
private Runnable getTask() {
    // 上一次調用poll從隊列中拉取任務是否超時,預設false,沒超時。該狀態用於配合線程池的超時退出機制使用。
	boolean timedOut = false; // Did the last poll() time out?

	for (;;) {
		int c = ctl.get();
		int rs = runStateOf(c);

		// Check if queue empty only if necessary.
        // 判斷:
        // 1.rs >= SHUTDOWN,線程池狀態至少是SHUTDOWN
        // 2.在滿足條件1的情況下,rs >= STOP 線程池的狀態已經至少是stop了,stop狀態不需要繼續處理隊列中的任務了;workQueue.isEmpty()說明隊列已經空了,說明工作線程可以退出了。
		if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
			decrementWorkerCount();
			return null;
		}

		int wc = workerCountOf(c);

		// Are workers subject to culling?
        // 參數allowCoreThreadTimeOut是用來控制核心線程是否需要超時退出的一個變數,預設情況下該變數為false(表示核心線程不需要超時退出),wc > corePoolSize 說明有非核心線程加入了線程池,這些線程是需要超時退出的。
		boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // wc > maximumPoolSize 當前的工作線程總數大於maximumPoolSize,說明瞭通過setMaximumPoolSize()方法減少了線程池容量
        // timed && timedOut 返回true 說明瞭線程是需要超時退出的,並且上一輪迴圈通過poll()方法從任務隊列中拉取任務為null
		if ((wc > maximumPoolSize || (timed && timedOut))
			&& (wc > 1 || workQueue.isEmpty())) {
            // cas WorkerCount - 1,成功返回null,失敗繼續下一輪。
			if (compareAndDecrementWorkerCount(c))
				return null;
			continue;
		}

		try {
            // 如果timed為true,通過poll()方法做超時拉取,keepAliveTime時間內沒有等待到有效的任務,則返回null,如果timed為false,通過take()做阻塞拉取,會阻塞到有下一個有效的任務時候再返回(一般不會是null)
			Runnable r = timed ?
				workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
				workQueue.take(); // 註意,這裡是能被中斷喚醒的,中斷喚醒會繼續執行迴圈。
			if (r != null)
				return r;
            // 代表調用 workQueue.poll() 超時返回null
			timedOut = true;
		} catch (InterruptedException retry) {
			timedOut = false;
		}
	}
}

4.2.5 processWorkerExit方法

工作線程退出時走的方法

代碼清單4-5 java.util.concurrent.ThreadPoolExecutor#processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 因為拋出用戶異常導致線程終結,直接使工作線程數減1即可
    // 如果沒有任何異常拋出的情況下是通過getTask()返回null引導線程正常跳出runWorker()方法的while死迴圈從而正常終結,這種情況下,在getTask()中已經把線程數減1
	if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
		decrementWorkerCount();

	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try {
        // 線程退出時,加上他完成的任務
		completedTaskCount += w.completedTasks;
        // 將線程從workers集合中移除。
		workers.remove(w);
	} finally {
		mainLock.unlock();
	}

    // 用於根據當前線程池的狀態判斷是否需要進行線程池terminate處理,見4.2.6 代碼清單4-6
	tryTerminate();

	int c = ctl.get();
    // 如果線程池的狀態小於STOP,也就是處於RUNNING或者SHUTDOWN狀態的前提下:
    // 1.如果線程不是由於拋出用戶異常終結,如果允許核心線程超時,則保持線程池中至少存在一個工作線程
    // 2.如果線程由於拋出用戶異常終結,或者當前工作線程數為0,那麼直接添加一個新的非核心線程
	if (runStateLessThan(c, STOP)) {
		if (!completedAbruptly) {
            // 理論上線程池中的最小線程數,允許線程池超時,最小為0,否則為corePoolSize
			int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果min == 0,隊列中還有任務,那麼將min == 1
			if (min == 0 && ! workQueue.isEmpty())
				min = 1;
            // 工作線程數大於等於最小值,直接返回不新增線程
			if (workerCountOf(c) >= min)
				return; // replacement not needed
		}
		addWorker(null, false);
	}
}

4.2.6 tryTerminate方法

根據當前線程池的狀態判斷是否需要進行線程池terminate處理,在工作線程退出時,調用shutdown方法,shutdownNow方法方法時都會調用這個方法。

代碼清單4-6 java.util.concurrent.ThreadPoolExecutor#tryTerminate
final void tryTerminate() {
	for (;;) {
		int c = ctl.get();
        // 判斷線程池的狀態,如果是下麵三種情況下的任意一種則直接返回:
        // 1.線程池處於RUNNING狀態(RUNNING不需要結束線程池)
        // 2.線程池至少為TIDYING狀態,也就是TIDYING或者TERMINATED狀態,意味著已經走到了下麵的步驟,線程池即將終結
        // 3.線程池狀態為SHUTDOWN狀態並且任務隊列不為空(SHUTDOWN狀態還需要處理完任務隊列中的任務)
		if (isRunning(c) ||
			runStateAtLeast(c, TIDYING) ||
			(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
			return;
        // 工作線程數不為0,則中斷工作線程集合中的第一個空閑的工作線程
		if (workerCountOf(c) != 0) { // Eligible to terminate
            // 到了這一步,說明工作線程中還有工作線程,不允許結束線程池
            // 調用interruptIdleWorkers方法中斷工作線程,interruptIdleWorkers方法代碼在下麵,interruptIdleWorkers(ONLY_ONE)只會中斷一個線程,被中斷的線程會繼續進入tryTerminate方法去判斷workerCountOf是否為0,最終總會有一個線程會執行到下麵的代碼去。
			interruptIdleWorkers(ONLY_ONE);
            // 說明該線程不是最後一個線程,執行上面的interruptIdleWorkers去中斷其它線程(有這麼一種情況,非空閑的線程在執行某個任務,執行任務完畢之後,如果它剛好是核心線程,就會在下一輪迴圈阻塞在任務隊列的take()方法,如果不做額外的干預,它甚至會線上程池關閉之後永久阻塞在任務隊列的take()方法中。為了避免這種情況,每個工作線程退出的時候都會嘗試中斷工作線程集合中的某一個空閑的線程,確保所有空閑的線程都能夠正常退出。)
			return;
		}

        //註意-方法走到了這裡,說明結束線程池中的工作線程為0了-
		final ReentrantLock mainLock = this.mainLock;
		mainLock.lock();
		try {
            // CAS設置線程池狀態為TIDYING,如果設置成功則執行鉤子方法terminated()
			if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
				try {
					terminated();
				} finally {
                    // 最後更新線程池狀態為TERMINATED
					ctl.set(ctlOf(TERMINATED, 0));
                    // 喚醒阻塞在termination條件的所有線程,這個變數的await()方法在awaitTermination()中調用
					termination.signalAll();
				}
				return;
			}
		} finally {
			mainLock.unlock();
		}
		// else retry on failed CAS
	}
}

// 中斷空閑的工作線程,onlyOne為true的時候,只會中斷工作線程集合中的某一個線程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // t.isInterrupted() 為 true表示已經被中斷了,這裡是去尋找第一個沒有被中斷的空閑線程。w.tryLock() 加鎖成功表示線程是空閑的,否則表示線程在執行任務,不允許中斷。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            // 這裡跳出迴圈,也就是只中斷集合中第一個工作線程,集合中的第一個線程被中斷
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

到這裡,淺析了execute方法的執行流程,從工作線程的創建,工作線程的結束,阻塞隊列線上程池中的作用等等,下麵將介紹線程池的關閉方法shutdown和shutdownNow

4.3 線程池的關閉

關於shutdownshutdownNow方法的使用可參考本篇 2.3 關閉線程池

4.3.1 shutdown方法

public void shutdown() {
	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try {
        // 許可權校驗,安全策略相關判斷
		checkShutdownAccess();
        // 設置SHUTDOWN狀態
		advanceRunState(SHUTDOWN);
        // 中斷所有的空閑的工作線程
		interruptIdleWorkers();
        // 鉤子方法
		onShutdown(); // hook for ScheduledThreadPoolExecutor
	} finally {
		mainLock.unlock();
	}
    // 調用上面分析果敢的嘗試terminate方法,使狀態更變為TIDYING,執行鉤子方法terminated()後,最終狀態更新為TERMINATED
	tryTerminate();
}

// 中斷所有的空閑的工作線程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

4.3.2 shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 許可權校驗,安全策略相關判斷
        checkShutdownAccess();
        // 設置STOP狀態
        advanceRunState(STOP);
        // 中斷所有的工作線程
        interruptWorkers();
        // 清空工作隊列並且取出所有的未執行的任務
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
     // 調用上面分析果敢的嘗試terminate方法,使狀態更變為TIDYING,執行鉤子方法terminated()後,最終狀態更新為TERMINATED
    tryTerminate();
    return tasks;
}

// 遍歷所有的工作線程,如果state > 0(啟動狀態)則進行中斷
private void interruptWorkers() {
    // assert mainLock.isHeldByCurrentThread();
    for (Worker w : workers)
        w.interruptIfStarted();
}

void interruptIfStarted() {
	Thread t;
	if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
		try {
			t.interrupt();
		} catch (SecurityException ignore) {
		}
	}
}

參考

書籍《Java併發編程的藝術》--Java中的線程池

詳解Java線程池的ctl(線程池控制狀態)【源碼分析】

線程池與線程的幾種狀態


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 大家好,我是痞子衡,是正經搞技術的痞子。今天痞子衡給大家介紹的是**恩智浦i.MX RT1170 FlexSPI NAND啟動時間**。 本篇是 i.MXRT1170 啟動時間評測第四彈,前三篇分別給大家評測了 [Raw NAND 啟動時間](https://www.cnblogs.com/henj ...
  • 一、插入數據優化 1.1 批量插入 如果有多條數據需要同時插入,不要每次插入一條,然後分多次插入,因為每執行一次插入的操作,都要進行資料庫的連接,多個操作就會連接多次,而一次批量操作只需要連接1次 1.2 手動提交事務 因為Mysql預設每執行一次操作,就會提交一次事務,這樣就會涉及到頻繁的事務的開 ...
  • “莆仙小館”——莆田文化展示APP 文化展示程式目的在於應用科學技術助推家鄉優秀傳統文化的展示與交流。通過圖片、視頻、音頻等展示方式向用戶立體地展示一個文化城邦。傳統文化與科學技術的有效融合,順應了社會發展的需要。傳統文化與科學技術的有效融合是發展中國特色社會主義文化的客觀需要,是傳承中國優秀傳統文 ...
  • # 解決方案 使用`ngClass`和`ngStyle`可以進行樣式的綁定。 ## ngStyle的使用 ngStyle 根據組件中的變數, isTextColorRed和fontSize的值來動態設置元素的顏色和字體大小 ```HTML This text has dynamic styles b ...
  • 在本篇文章中,我們詳細介紹了 Flutter 進階的主題,包括導航和路由、狀態管理、非同步處理、HTTP請求和Rest API,以及數據持久化。這些主題在實際應用中都非常重要,幫助你構建更複雜、功能更強大的 Flutter 應用。 ...
  • 在SpringBoot的Controller中,可以使用註解@RequestBody來獲取POST請求中的JSON數據。我們可以將這個註解應用到一個Controller方法的參數上,Spring將會負責讀取請求正文中的數據,將其反序列化為一個Java對象,並將其作為Controller方法的參數傳遞 ...
  • 對於從事後端開發的同學來說,線程安全問題是我們每天都需要考慮的問題。 線程安全問題通俗地講主要是在多線程的環境下,不同線程同時讀和寫公共資源(臨界資源)導致的數據異常問題。 比如:變數a=0,線程1給該變數+1,線程2也給該變數+1。此時,線程3獲取a的值有可能不是2,而是1。線程3這不就獲取了錯誤 ...
  • ### 歡迎訪問我的GitHub > 這裡分類和彙總了欣宸的全部原創(含配套源碼):[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) ### 關於bean的作用域(scope) - 官方資料:ht ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...