jdk線程池ThreadPoolExecutor工作原理解析(自己動手實現線程池)(一) 線程池介紹 在日常開發中經常會遇到需要使用其它線程將大量任務非同步處理的場景(非同步化以及提升系統的吞吐量),而在使用線程的過程中卻存在著兩個痛點。 在java等很多主流語言中每個邏輯上的線程底層都對應著一個系統線 ...
jdk線程池ThreadPoolExecutor工作原理解析(自己動手實現線程池)(一)
線程池介紹
在日常開發中經常會遇到需要使用其它線程將大量任務非同步處理的場景(非同步化以及提升系統的吞吐量),而在使用線程的過程中卻存在著兩個痛點。
- 在java等很多主流語言中每個邏輯上的線程底層都對應著一個系統線程(不考慮虛擬線程的情況)。操作系統創建一個新線程是存在一定開銷的,
在需要執行大量的非同步任務時,如果處理每個任務時都直接向系統申請創建一個線程來執行,併在任務執行完畢後再回收線程,則創建/銷毀大量線程的開銷將無法忍受。 - 每個系統線程都會占用一定的記憶體空間,且系統在調度不同線程上下文切換時存在一定的cpu開銷。因此在一定的硬體條件下,操作系統能同時維護的系統線程個數相對而言是比較有限的。
在使用線程的過程中如果沒有控制好流量,會很容易創建過多的線程而耗盡系統資源,令系統變得不可用。
而線程池正是為解決上述痛點而生的,其通過兩個手段來解決上述痛點。
池化線程資源
池化線程資源,顧名思義就是維護一個存活線程的集合(池子)。提交任務的用戶程式不直接控制線程的創建和銷毀,不用每次執行任務時都申請創建一個新線程,而是通過線程池間接的獲得線程去處理非同步任務。
線程池中的線程在執行完任務後通常也不會被系統回收掉,而是繼續待在池子中用於執行其它的任務(執行堆積的待執行任務或是等待新任務)。
線程池通過池化線程資源,避免了系統反覆創建/銷毀線程的開銷,大幅提高了處理大規模非同步任務時的性能。
對線程資源的申請進行收口,限制系統資源的使用
如果程式都統一使用線程池來處理非同步任務,則線程池內部便可以對系統資源的使用施加一定限制。
例如用戶可以指定一個線程池最大可維護的線程數量,以避免耗盡系統資源。
當用戶提交任務的速率過大,導致線程池中的線程數到達指定的最大值時依然無法滿足需求時,線程池可以通過丟棄部分任務或限制提交任務的流量的方式來處理這一問題。
線程池通過對線程資源的使用進行統一收口,用戶可以通過設置線程池的參數來控制系統資源的使用,從而避免系統資源耗盡。
jdk線程池ThreadPoolExecutor簡單介紹
前面介紹了線程池的概念,而要深入理解線程池的工作原理最好的辦法便是找到一個優秀的線程池實現來加以研究。
而自jdk1.5中引入的通用線程池框架ThreadPoolExecutor便是一個很好的學習對象。其內部實現不算複雜,卻在高效實現核心功能的同時還提供了較豐富的拓展能力。
下麵從整體上介紹一下jdk通用線程池ThreadPoolExecutor的工作原理(基於jdk8)。
ThreadPoolExecutor運行時工作流程
首先ThreadPoolExecutor允許用戶從兩個不同維度來控制線程資源的使用,即最大核心線程數(corePoolSize)和最大線程數(maximumPoolSize)。
最大核心線程數:核心線程指的是通常常駐線程池的線程。常駐線程線上程池沒有任務空閑時也不會被銷毀,而是處於idle狀態,這樣在新任務到來時就能很快的進行響應。
最大線程數:和第一節中提到的一樣,即線程池中所能允許的活躍線程的最大數量。
在向ThreadPoolExecutor提交任務時(execute方法),會執行一系列的判斷來決定任務應該如何被執行(源碼在下一節中具體分析)。
- 首先判斷當前活躍的線程數是否小於指定的最大核心線程數corePoolSize。
如果為真,則說明當前線程池還未完成預熱,核心線程數不飽和,創建一個新線程來執行該任務。
如果為假,則說明當前線程池已完成預熱,進行下一步判斷。 - 嘗試將當前任務放入工作隊列workQueue(阻塞隊列BlockingQueue),工作隊列中的任務會被線程池中的活躍線程按入隊順序逐個消費。
如果入隊成功,則說明當前工作隊列未滿,入隊的任務將會被線程池中的某個活躍線程所消費並執行。
如果入隊失敗,則說明當前工作隊列已飽和,線程池消費任務的速度可能太慢了,可能需要創建更多新線程來加速消費,進行下一步判斷。 - 判斷當前活躍的線程數是否小於指定的最大線程數maximumPoolSize。
如果為真,則說明當前線程池所承載的線程數還未達到參數指定的上限,還有餘量來創建新的線程加速消費,創建一個新線程來執行該任務。
如果為假,則說明當前線程池所承載的線程數達到了上限,但處理任務的速度依然不夠快,需要觸發拒絕策略。
ThreadPoolExecutor優雅停止
線程池的優雅停止一般要能做到以下幾點:
- 線程池在中止後不能再受理新的任務
- 線程池中止的過程中,已經提交的現存任務不能丟失(等待剩餘任務執行完再關閉或者能夠把剩餘的任務吐出來還給用戶)
- 線程池最終關閉前,確保創建的所有工作線程都已退出,不會出現資源的泄露
線程池自啟動後便會有大量的工作線程在內部持續不斷併發的執行提交的各種任務,而要想做到優雅停止並不是一件容易的事情。
因此ThreadPoolExecutor中最複雜、細節最多的部分並不在於上文中的正常工作流程,而在於分散在各個地方但又緊密協作的,控制優雅停止的邏輯。
ThreadPoolExecutor的其它功能
除了正常的工作流程以及優雅停止的功能外,ThreadPoolExecutor還提供了一些比較好用的功能
- 提供了很多protected修飾的鉤子函數,便於用戶繼承並實現自己的線程池時進行一定的拓展
- 在運行時統計了總共執行的任務數等關鍵指標,並提供了對應的api便於用戶在運行時觀察運行狀態
- 允許線上程池運行過程中動態修改關鍵的配置參數(比如corePoolSize等),並實時的生效。
jdk線程池ThreadPoolExecutor源碼解析(自己動手實現線程池v1版本)
如費曼所說:What I can not create I do not understand(我不能理解我創造不了的東西)。
通過模仿jdk的ThreadPoolExecutor實現,從零開始實現一個線程池,可以迫使自己去仔細的捋清楚jdk線程池中設計的各種細節,加深理解而達到更好的學習效果。
前面提到ThreadPoolExecutor的核心邏輯主要分為兩部分,一是正常運行時處理提交的任務的邏輯,二是實現優雅停止的邏輯。
因此我們實現的線程池MyThreadPoolExecutor(以My開頭用於區分)也會分為兩個版本,v1版本只實現前一部分即正常運行時執行任務的邏輯,將有關線程池優雅停止的邏輯全部去除。
相比直接啃jdk最終實現的源碼,v1版本的實現會更簡單更易理解,讓正常執行任務時的邏輯更加清晰而不會耦合太多關於優雅停止的邏輯。
線程池關鍵成員變數介紹
ThreadPoolExecutor中有許多的成員變數,大致可以分為三類。
可由用戶自定義的、用於控制線程池運行的配置參數
- volatile int corePoolSize(最大核心線程數量)
- volatile int maximumPoolSize(最大線程數量)
- volatile long keepAliveTime(idle線程保活時間)
- final BlockingQueue workQueue(工作隊列(阻塞隊列))
- volatile ThreadFactory threadFactory(工作線程工廠)
- volatile RejectedExecutionHandler handler(拒絕異常處理器)
- volatile boolean allowCoreThreadTimeOut(是否允許核心線程在idle超時後退出)
其中前6個配置參數都可以在ThreadPoolExecutor的構造函數中指定,而allowCoreThreadTimeOut則可以通過暴露的public方法allowCoreThreadTimeOut來動態的設置。
其中大部分屬性都是volatile修飾的,目的是讓運行過程中可以用過提供的public方法動態修改這些值後,線程池中的工作線程或提交任務的用戶線程能及時的感知到變化(線程間的可見性),併進行響應(比如令核心線程自動的idle退出)
這些配置屬性具體如何控制線程池行為的原理都會在下麵的源碼解析中展開介紹。理解這些參數的工作原理後才能在實際的業務中使用線程池時為其設置合適的值。
僅供線程池內部工作時使用的屬性
- ReentrantLock mainLock(用於控制各種臨界區邏輯的併發)
- HashSet
workers(當前活躍工作線程Worker的集合,工作線程的工作原理會在下文介紹) - AtomicInteger ctl(線程池控制狀態,control的簡寫)
這裡重點介紹一下ctl屬性。ctl雖然是一個32位的整型欄位(AtomicInteger),但實際上卻用於標識兩個業務屬性,即當前線程池的運行狀態和worker線程的總數量。
線上程池初始化時狀態位RUNNING,worker線程數量位0(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)。
ctl的32位中的高3位用於標識線程池當前的狀態,剩餘的29位用於標識線程池中worker線程的數量(因此理論上ThreadPoolExecutor最大可容納的線程數並不是231-1(32位中符號要占一位),而是229-1)
由於聚合之後單獨的讀寫某一個屬性不是很方便,所以ThreadPoolExecutor中提供了很多基於位運算的輔助函數來簡化這些邏輯。
ctl這樣聚合的設計比起拆分成兩個獨立的欄位有什麼好處?
在ThreadPoolExecutor中關於優雅停止的邏輯中有很多地方是需要同時判斷當前工作線程數量與線程池狀態後,再對線程池狀態工作線程數量進行更新的(具體邏輯在下一篇v2版本的博客中展開)。
且為了執行效率,不使用互斥鎖而是通過cas重試的方法來解決併發更新的問題。而對一個AtomicInteger屬性做cas重試的更新,要比同時控制兩個屬性進行cas的更新要簡單很多,執行效率也高很多。
ThreadPoolExecutor共有五種狀態,但有四種都和優雅停止有關(除了RUNNING)。
但由於v1版本的MyThreadPoolExecutorV1不支持優雅停止,所以不在本篇博客中講解這些狀態具體的含義以及其是如何變化的(下一篇v2版本的博客中展開)
記錄線程池運行過程中的一些關鍵指標
- completedTaskCount(線程池自啟動後已完成的總任務數)
- largestPoolSize(線程池自啟動後工作線程個數的最大值)
在運行過程中,ThreadPoolExecutor會在對應的地方進行埋點,統計一些指標並提供相應的api給用戶實時的查詢,以提高線程池工作時的可觀測性。
public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{
/**
* 指定的最大核心線程數量
* */
private volatile int corePoolSize;
/**
* 指定的最大線程數量
* */
private volatile int maximumPoolSize;
/**
* 線程保活時間(單位:納秒 nanos)
* */
private volatile long keepAliveTime;
/**
* 存放任務的工作隊列(阻塞隊列)
* */
private final BlockingQueue<Runnable> workQueue;
/**
* 線程工廠
* */
private volatile ThreadFactory threadFactory;
/**
* 拒絕策略
* */
private volatile MyRejectedExecutionHandler handler;
/**
* 是否允許核心線程在idle一定時間後被銷毀(和非核心線程一樣)
* */
private volatile boolean allowCoreThreadTimeOut;
/**
* 主控鎖
* */
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 當前線程池已完成的任務數量
* */
private long completedTaskCount;
/**
* 維護當前存活的worker線程集合
* */
private final HashSet<MyWorker> workers = new HashSet<>();
/**
* 當前線程池中存在的worker線程數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
* v1版本只關心前者,即worker線程數量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 32位的有符號整數,有3位是用來存放線程池狀態的,所以用來維護當前工作線程個數的部分就只能用29位了
* 被占去的3位中,有1位原來的符號位,2位是原來的數值位。
* */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 線程池狀態poolStatus常量(狀態值只會由小到大,單調遞增)
* 線程池狀態遷移圖:
* ↗ SHUTDOWN ↘
* RUNNING ↓ TIDYING → TERMINATED
* ↘ STOP ↗
* 1 RUNNING狀態,代表著線程池處於正常運行的狀態。能正常的接收並處理提交的任務
* 線程池對象初始化時,狀態為RUNNING
* 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
*
* 2 SHUTDOWN狀態,代表線程池處於停止對外服務的狀態。不再接收新提交的任務,但依然會將workQueue工作隊列中積壓的任務處理完
* 調用了shutdown方法時,狀態由RUNNING -> SHUTDOWN
* 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
*
* 3 STOP狀態,代表線程池處於停止狀態。不再接受新提交的任務,同時也不再處理workQueue工作隊列中積壓的任務,當前還在處理任務的工作線程將收到interrupt中斷通知
* 之前未調用shutdown方法,直接調用了shutdownNow方法,狀態由RUNNING -> STOP
* 之前先調用了shutdown方法,後調用了shutdownNow方法,狀態由SHUTDOWN -> STOP
* 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
*
* 4 TIDYING狀態,代表著線程池即將完全終止,正在做最後的收尾工作
* 當前線程池狀態為SHUTDOWN,任務被消費完工作隊列workQueue為空,且工作線程全部退出完成工作線程集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
* 當前線程池狀態為STOP,工作線程全部退出完成工作線程集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
* 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
*
* 5 TERMINATED狀態,代表著線程池完全的關閉。之前線程池已經處於TIDYING狀態,且調用的鉤子函數terminated已返回
* 當前線程池狀態為TIDYING,調用的鉤子函數terminated已返回
* 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
* */
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/**
* 跟蹤線程池曾經有過的最大線程數量(只能在mainLock的併發保護下更新)
*/
private int largestPoolSize;
private boolean compareAndIncrementWorkerCount(int expect) {
return this.ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {
// cas更新,workerCount自減1
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
public MyThreadPoolExecutorV1(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
MyRejectedExecutionHandler handler) {
// 基本的參數校驗
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (unit == null || workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
// 設置成員變數
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
}
Worker工作線程
ThreadPoolExecutor中的工作線程並不是裸的Thread,而是被封裝在了一個Worker的內部類中。
Worker實現了Runnable所以可以作為一個普通的線程來啟動,在run方法中只是簡單的調用了一下runWorker(runWorker後面再展開)。
Worker類有三個成員屬性:
- Thread thread(被封裝的工作線程對象)
- Runnable firstTask(提交任務時,創建新Worker對象時指定的第一次要執行的任務(後續線程就會去拉取工作隊列里的任務執行了))
- volatile long completedTasks(統計用,計算當前工作線程總共完成了多少個任務)
Worker內封裝的實際的工作線程對象thread,其在構造函數中由線程池的線程工廠threadFactory生成,傳入this,所以thread在start後,便會調用run方法進而執行runWorker。
線程工廠可以由用戶在創建線程池時通過參數指定,因此用戶在自由控制所生成的工作線程的同時,也需要保證newThread能正確的返回一個可用的線程對象。
除此之外,Worker對象還繼承了AbstractQueuedSynchronizer(AQS)類,簡單的實現了一個不可重入的互斥鎖。
對AQS互斥模式不太瞭解的讀者可以參考一下我之前關於AQS互斥模式的博客:AQS互斥模式與ReentrantLock可重入鎖原理解析
AQS中維護了一個volatile修飾的int類型的成員變數state,其具體的含義可以由使用者自己定義。
在Worker中,state的值有三種狀態:
- state=-1,標識工作線程還未啟動(不會被interruptIfStarted打斷)
- state=0,標識工作線程已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
- state=1,標識worker線程正在執行任務(runWorker方法中,成功獲得任務後,通過lock方法將state設置為1)
具體這三種情況分別在什麼時候出現會在下麵解析提交任務源碼的那部分里詳細介紹。
/**
* jdk的實現中令Worker繼承AbstractQueuedSynchronizer並實現了一個不可重入的鎖
* AQS中的state屬性含義
* -1:標識工作線程還未啟動
* 0:標識工作線程已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
* 1:標識worker線程正在執行任務(runWorker中,成功獲得任務後,通過lock方法將state設置為1)
* */
private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
public MyWorker(Runnable firstTask) {
this.firstTask = firstTask;
// newThread可能是null
this.thread = getThreadFactory().newThread(this);
}
@Override
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock(){
acquire(1);
}
public boolean tryLock(){
return tryAcquire(1);
}
public void unlock(){
release(1);
}
public boolean isLocked(){
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
// 三個條件同時滿足,才去中斷Worker對應的thread
// getState() >= 0,用於過濾還未執行runWorker的,剛入隊初始化的Worker
// thread != null,用於過濾掉構造方法中ThreadFactory.newThread返回null的Worker
// !t.isInterrupted(),用於過濾掉那些已經被其它方式中斷的Worker線程(比如用戶自己去觸發中斷,提前終止線程池中的任務)
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
execute執行提交的任務
下麵介紹本篇博客的重點,即線程池是如何執行用戶所提交的任務的。
用戶提交任務的入口是public的execute方法,Runnable類型的參數command就是提交的要執行的任務。
MyThreadPoolExecutorV1的execute方法(相比jdk的實現v1版本去掉了關於優雅停止的邏輯)
/**
* 提交任務,並執行
* */
public void execute(Runnable command) {
if (command == null){
throw new NullPointerException("command參數不能為空");
}
int currentCtl = this.ctl.get();
if (workerCountOf(currentCtl) < this.corePoolSize) {
// 如果當前存在的worker線程數量低於指定的核心線程數量,則創建新的核心線程
boolean addCoreWorkerSuccess = addWorker(command,true);
if(addCoreWorkerSuccess){
// addWorker添加成功,直接返回即可
return;
}
}
// 走到這裡有兩種情況
// 1 因為核心線程超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞隊列
// 2 addWorker返回false,創建核心工作線程失敗
if(this.workQueue.offer(command)){
// workQueue.offer入隊成功
if(workerCountOf(currentCtl) == 0){
// 在corePoolSize為0的情況下,當前不存在存活的核心線程
// 一個任務在入隊之後,如果當前線程池中一個線程都沒有,則需要兜底的創建一個非核心線程來處理入隊的任務
// 因此firstTask為null,目的是先讓任務先入隊後創建線程去拉取任務並執行
addWorker(null,false);
}else{
// 加入隊列成功,且當前存在worker線程,成功返回
return;
}
}else{
// 阻塞隊列已滿,嘗試創建一個新的非核心線程處理
boolean addNonCoreWorkerSuccess = addWorker(command,false);
if(!addNonCoreWorkerSuccess){
// 創建非核心線程失敗,執行拒絕策略(失敗的原因和前面創建核心線程addWorker的原因類似)
reject(command);
}else{
// 創建非核心線程成功,成功返回
return;
}
}
}
/**
* 根據指定的拒絕處理器,執行拒絕策略
* */
private void reject(Runnable command) {
this.handler.rejectedExecution(command, this);
}
可以看到,execute方法源碼中對於任務處理的邏輯很清晰,也能與ThreadPoolExecutor運行時工作流程中所介紹的流程所匹配。
addWorker方法(創建新的工作線程)
在execute方法中當需要創建核心線程或普通線程時,便需要通過addWorker方法嘗試創建一個新的工作線程。
/**
* 向線程池中加入worker
* */
private boolean addWorker(Runnable firstTask, boolean core) {
// retry標識外層迴圈
retry:
for (;;) {
int currentCtl = ctl.get();
// 用於cas更新workerCount的內層迴圈(註意這裡面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
for (;;) {
// 判斷當前worker數量是否超過了限制
int workerCount = workerCountOf(currentCtl);
if (workerCount >= CAPACITY) {
// 當前worker數量超過了設計上允許的最大限制
return false;
}
if (core) {
// 創建的是核心線程,判斷當前線程數是否已經超過了指定的核心線程數
if (workerCount >= this.corePoolSize) {
// 超過了核心線程數,創建核心worker線程失敗
return false;
}
} else {
// 創建的是非核心線程,判斷當前線程數是否已經超過了指定的最大線程數
if (workerCount >= this.maximumPoolSize) {
// 超過了最大線程數,創建非核心worker線程失敗
return false;
}
}
// cas更新workerCount的值
boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
if (casSuccess) {
// cas成功,跳出外層迴圈
break retry;
}
// compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層迴圈
}
}
boolean workerStarted = false;
MyWorker newWorker = null;
try {
// 創建一個新的worker
newWorker = new MyWorker(firstTask);
final Thread myWorkerThread = newWorker.thread;
if (myWorkerThread != null) {
// MyWorker初始化時內部線程創建成功
// 加鎖,防止併發更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorkerThread.isAlive()) {
// 預檢查線程的狀態,剛初始化的worker線程必須是未喚醒的狀態
throw new IllegalThreadStateException();
}
// 加入worker集合
this.workers.add(newWorker);
int workerSize = workers.size();
if (workerSize > largestPoolSize) {
// 如果當前worker個數超過了之前記錄的最大存活線程數,將其更新
largestPoolSize = workerSize;
}
// 創建成功
} finally {
// 無論是否發生異常,都先將主控鎖解鎖
mainLock.unlock();
}
// 加入成功,啟動worker線程
myWorkerThread.start();
// 標識為worker線程啟動成功,並作為返回值返回
workerStarted = true;
}
}finally {
if (!workerStarted) {
addWorkerFailed(newWorker);
}
}
return workerStarted;
}
addWorker可以分為兩部分:判斷當前是否滿足創建新工作線程的條件、創建並啟動新的Worker工作線程。
判斷當前是否滿足創建新工作線程的條件
入口處開始的retry標識的for迴圈部分,便是用於判斷是否滿足創建新工作線程的條件。
- 首先判斷當前工作線程數量是否超過了理論的最大值CAPACITY(即2^29-1),超過了則不能創建,返回false,不創建新工作線程
- 根據boolean類型參數core判斷是否創建核心工作線程,core=true則判斷是否超過了corePoolSize的限制,core=false則判斷是否超過了maximumPoolSize的限制。不滿足則返回false,不創建新工作線程
- 滿足上述限制條件後,則說明可以創建新線程了,compareAndIncrementWorkerCount方法進行cas的增加當前工作線程數。
如果cas失敗,則說明存在併發的更新了,則再一次的迴圈重試,並再次的進行上述檢查。
需要註意的是:這裡面有兩個for迴圈的原因在於v1版本省略了優雅停止的邏輯(所以實際上v1版本能去掉內層迴圈的)。如果線程池處於停止狀態則不能再創建新工作線程了,因此也需要判斷線程池當前的狀態,
不滿足條件則也需要返回false,不創建工作線程。
而且compareAndIncrementWorkerCount中cas更新ctl時,如果併發的線程池被停止而導致線程池狀態發生了變化,也會導致cas失敗重新檢查。
這也是jdk的實現中為什麼把線程池狀態和工作線程數量綁定在一起的原因之一,這樣在cas更新時可以原子性的同時檢查兩個欄位的併發爭搶。(更具體的細節會在下一篇博客的v2版本中介紹)
創建並啟動新的Worker工作線程
在通過retry那部分的層層條件檢查後,緊接著便是實際創建新工作線程的邏輯。
- 首先通過Worker的構造方法創建一個新的Worker對象,並將用戶提交的任務作為firstTask參數傳入。
- 判斷Worker在構造時線程工廠是否正確的生成了一個Thread(判空),如果thread == null的話直接返回false,標識創建新工作線程失敗。
- 在mainLock的保護下,將新創建的worker線程加入workers集合中
- 啟動Worker中的線程(myWorkerThread.start()),啟動後會執行Worker類中的run方法,新的工作線程會執行runWorker方法(下文會展開分析runWorker)
- 如果Worker中的線程不是alive狀態等原因導致工作線程啟動失敗,則在finally中通過addWorkerFailed進行一系列的回滾操作
雖然在前麵線程池工作流程的分析中提到了核心線程與非核心線程的概念,但Worker類中實際上並沒有核心/非核心的標識。
經過了工作線程啟動前的條件判斷後,新創建的工作線程實際上並沒有真正的核心與非核心的差別。
addWorkerFailed(addWorker的逆向回滾操作)
addWorker中工作線程可能會啟動失敗,所以要對addWorker中對workers集合以及workerCount等數據的操作進行回滾。
/**
* 當創建worker出現異常失敗時,對之前的操作進行回滾
* 1 如果新創建的worker加入了workers集合,將其移除
* 2 減少記錄存活的worker個數(cas更新)
* 3 檢查線程池是否滿足中止的狀態,防止這個存活的worker線程阻止線程池的中止(v1版本不考慮,省略了tryTerminate)
*/
private void addWorkerFailed(MyWorker myWorker) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorker != null) {
// 如果新創建的worker加入了workers集合,將其移除
workers.remove(myWorker);
}
// 減少存活的worker個數
decrementWorkerCount();
// 嘗試著將當前worker線程終止(addWorkerFailed由工作線程自己調用)
// tryTerminate();
} finally {
mainLock.unlock();
}
}
runWorker(工作線程核心執行邏輯)
前面介紹了用戶如何向線程池提交任務,以及如何創建新工作線程Worker,下麵介紹工作線程線上程池中是如何運行的。
- runWorker方法內部本質上是一個無限迴圈,在進入主迴圈之前通過unlock方法,將內部AQS父類中的state標識為0,允許被外部中斷(可以被interruptIfStarted選中而打斷)
- 之後便是主迴圈,如果firstTask不為空(說明第一次啟動),則直接調用task.run方法。否則通過getTask方法嘗試從工作隊列中撈取一個任務來執行
- 在實際的任務執行前和執行後都調用對應的鉤子方法(beforeExecute、afterExecute)
- 在任務執行前通過lock方法將AQS的state方法設置為1代表當前Worker正在執行任務,併在執行完一個任務後在finally中進行unlock解鎖,令當前工作線程進入idle狀態。
同時清空firstTask的值(清空後下一次迴圈就會通過getTask獲取任務了)並令Worker中的completedTasks統計指標也自增1 - 如果任務執行過程中出現了異常,則catch住並最終向上拋出跳出主迴圈,finally中執行processWorkerExit(認為任務一旦執行出現了異常,則很可能工作線程內部的一些狀態已經損壞,需要重新創建一個新的工作線程來代替出異常的老工作線程)
- 有兩種情況會導致執行processWorkerExit,一種是上面說的任務執行時出現了異常,此時completedAbruptly=true;還有一種可能時getTask因為一些原因返回了null,此時completedAbruptly=false。
completedAbruptly會作為processWorkerExit的參數傳遞。
/**
* worker工作線程主迴圈執行邏輯
* */
private void runWorker(MyWorker myWorker) {
// 時worker線程的run方法調用的,此時的current線程的是worker線程
Thread workerThread = Thread.currentThread();
Runnable task = myWorker.firstTask;
// 已經暫存了firstTask,將其清空(有地方根據firstTask是否存在來判斷工作線程中負責的任務是否是新提交的)
myWorker.firstTask = null;
// 將state由初始化時的-1設置為0
// 標識著此時當前工作線程開始工作了,這樣可以被interruptIfStarted選中
myWorker.unlock();
// 預設線程是由於中斷退出的
boolean completedAbruptly = true;
try {
// worker線程處理主迴圈,核心邏輯
while (task != null || (task = getTask()) != null) {
// 將state由0標識為1,代表著其由idle狀態變成了正在工作的狀態
// 這樣interruptIdleWorkers中的tryLock會失敗,這樣工作狀態的線程就不會被該方法中斷任務的正常執行
myWorker.lock();
// v1版本此處省略優雅停止相關的核心邏輯
try {
// 任務執行前的鉤子函數
beforeExecute(workerThread, task);
Throwable thrown = null;
try {
// 拿到的任務開始執行
task.run();
} catch (RuntimeException | Error x) {
// 使用thrown收集拋出的異常,傳遞給afterExecute
thrown = x;
// 同時拋出錯誤,從而中止主迴圈
throw x;
} catch (Throwable x) {
// 使用thrown收集拋出的異常,傳遞給afterExecute
thrown = x;
// 同時拋出錯誤,從而中止主迴圈
throw new Error(x);
} finally {
// 任務執行後的鉤子函數,如果任務執行時拋出了錯誤/異常,thrown不為null
afterExecute(task, thrown);
}
} finally {
// 將task設置為null,令下一次while迴圈通過getTask獲得新任務
task = null;
// 無論執行時是否存在異常,已完成的任務數加1
myWorker.completedTasks++;
// 無論如何將myWorker解鎖,標識為idle狀態
myWorker.unlock();
}
}
// getTask返回了null,說明沒有可執行的任務或者因為idle超時、線程數超過配置等原因需要回收當前線程。
// 線程正常的退出,completedAbruptly為false
completedAbruptly = false;
}finally {
// getTask返回null,線程正常的退出,completedAbruptly值為false
// task.run()執行時拋出了異常/錯誤,直接跳出了主迴圈,此時completedAbruptly為初始化時的預設值true
processWorkerExit(myWorker, completedAbruptly);
// processWorkerExit執行完成後,worker線程對應的run方法(run->runWorker)也會執行完畢
// 此時線程對象會進入終止態,等待操作系統回收
// 而且processWorkerExit方法內將傳入的Worker從workers集合中移除,jvm中的對象也會因為不再被引用而被GC回收
// 此時,當前工作線程所占用的所有資源都已釋放完畢
}
}
getTask嘗試獲取任務執行
runWorker中是通過getTask獲取任務的,getTask中包含著工作線程是如何從工作隊列中獲取任務的關鍵邏輯。
- 在獲取任務前,需要通過getTask檢查當前線程池的線程數量是否超過了參數配置(啟動後被動態調整了),因此需要先獲得當前線程池工作線程總數workCount。
如果當前工作線程數量超過了指定的最大線程個數maximumPoolSize限制,則說明當前線程需要退出了 - timed標識用於決定當前線程如何從工作隊列(阻塞隊列)中獲取新任務,如果timed為true則通過poll方法獲取同時指定相應的超時時間(配置參數keepAliveTime),如果timed為false則通過take方法無限期的等待。
如果工作隊列並不為空,則poll和take方法都會立即返回一個任務對象。而當工作隊列為空時,工作線程則會阻塞在工作隊列上以讓出CPU(idle狀態)直到有新的任務到來而被喚醒(或者超時喚醒)。
這也是存儲任務的workQueue不能是普通的隊列,而必須是阻塞隊列的原因。(對阻塞隊列工作原理不太清楚的讀者可以參考我以前的博客:自己動手實現一個阻塞隊列) - timed的值由兩方面共同決定。一是配置參數allowCoreThreadTimeOut是否為true,為true的話說明不管是核心線程還是非核心線程都需要在idle等待keepAliveTime後銷毀退出。所以allowCoreThreadTimeOut=true,則timed一定為true
二是如果allowCoreThreadTimeOut為false,說明核心線程不需要退出,而非核心線程在idle等待keepAliveTime後需要銷毀退出。則判斷當前workCount是否大於配置的corePoolSize,是的話則timed為true否則為false。
如果當前線程數超過了指定的最大核心線程數corePoolSize,則需要讓工作隊列為空時(說明線程池負載較低)部分idle線程退出,使得最終活躍的線程數減少到和corePoolSize一致。
從這裡可以看到,核心與非核心線程的概念在ThreadPoolExecutor里是很弱的,不關心工作線程最初是以什麼原因創建的都一視同仁,誰都可能被當作非核心線程而銷毀退出。 - timedOut標識當前工作線程是否因為poll拉取任務時出現了超時。take永遠不會返回null,因此只有poll在超時時會返回null,當poll返回值為null時,表明是等待了keepAliveTime時間後超時了,所以timedOut標識為true。
同時如果拉取任務時線程被中斷了,則捕獲InterruptedException異常,將timeOut標識為false(被中斷的就不認為是超時)。 - 當(workCount > maximumPoolSize)或者 (timed && timedOut)兩者滿足一個時,就說明當前線程應該要退出了。
此時將當前的workCount用cas的方式減去1,返回null代表獲取任務失敗即可;如果cas失敗,則在for迴圈中重試。
但有一種情況是例外的(workCount <= 1 && !workQueue.isEmpty()),即當前工作線程數量恰好為1,且工作隊列不為空(那麼還需要當前線程繼續工作把工作隊列里的任務都消費掉,無論如何不能退出)
/**
* 嘗試著從阻塞隊列里獲得待執行的任務
* @return 返回null代表工作隊列為空,沒有需要執行的任務; 或者當前worker線程滿足了需要退出的一些條件
* 返回對應的任務
* */
private Runnable getTask() {
boolean timedOut = false;
for(;;) {
int currentCtl = ctl.get();
// 獲得當前工作線程個數
int workCount = workerCountOf(currentCtl);
// 有兩種情況需要指定超時時間的方式從阻塞隊列workQueue中獲取任務(即timed為true)
// 1.線程池配置參數allowCoreThreadTimeOut為true,即允許核心線程在idle一定時間後被銷毀
// 所以allowCoreThreadTimeOut為true時,需要令timed為true,這樣可以讓核心線程也在一定時間內獲取不到任務(idle狀態)而被銷毀
// 2.線程池配置參數allowCoreThreadTimeOut為false,但當前線程池中的線程數量workCount大於了指定的核心線程數量corePoolSize
// 說明當前有一些非核心的線程正在工作,而非核心的線程在idle狀態一段時間後需要被銷毀
// 所以此時也令timed為true,讓這些線程在keepAliveTime時間內由於隊列為空拉取不到任務而返回null,將其銷毀
boolean timed = allowCoreThreadTimeOut || workCount > corePoolSize;
// 有共四種情況不需要往下執行,代表
// 1 (workCount > maximumPoolSize && workCount > 1)
// 當前工作線程個數大於了指定的maximumPoolSize(可能是由於啟動後通過setMaximumPoolSize調小了maximumPoolSize的值)
// 已經不符合線程池的配置參數約束了,要將多餘的工作線程回收掉
// 且當前workCount > 1說明存在不止一個工作線程,意味著即使將當前工作線程回收後也還有其它工作線程能繼續處理工作隊列里的任務,直接返回null表示自己需要被回收
// 2 (workCount > maximumPoolSize && workCount <= 1 && workQueue.isEmpty())
// 當前工作線程個數大於了指定的maximumPoolSize(maximumPoolSize被設置為0了)
// 已經不符合線程池的配置參數約束了,要將多餘的工作線程回收掉
// 但此時workCount<=1,說明將自己這個工作線程回收掉後就沒有其它工作線程能處理工作隊列里剩餘的任務了
// 所以即使maximumPoolSize設置為0,也需要等待任務被處理完,工作隊列為空之後才能回收當前線程,否則還會繼續拉取剩餘任務
// 3 (workCount <= maximumPoolSize && (timed && timedOut) && workCount > 1)
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,並且最近一次拉取任務超時了timedOut=true
// 進入新的一次迴圈後timed && timedOut成立,說明當前worker線程處於idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前線程
// 且當前workCount > 1說明存在不止一個工作線程,意味著即使將當前工作線程回收後也還有其它工作線程能繼續處理工作隊列里的任務,直接返回null表示自己需要被回收
// 4 (workCount <= maximumPoolSize && (timed && timedOut) && workQueue.isEmpty())
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,並且最近一次拉取任務超時了timedOut=true
// 進入新的一次迴圈後timed && timedOut成立,說明當前worker線程處於idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前線程
// 但此時workCount<=1,說明將自己這個工作線程回收掉後就沒有其它工作線程能處理工作隊列里剩餘的任務了
// 所以即使timed && timedOut超時邏輯匹配,也需要等待任務被處理完,工作隊列為空之後才能回收當前線程,否則還會繼續拉取剩餘任務
if ((workCount > maximumPoolSize || (timed && timedOut))
&& (workCount > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(currentCtl)) {
// 滿足上述條件,說明當前線程需要被銷毀了,返回null
return null;
}
// compareAndDecrementWorkerCount方法由於併發的原因cas執行失敗,continue迴圈重試
continue;
}
try {
// 根據上面的邏輯的timed標識,決定以什麼方式從阻塞隊列中獲取任務
Runnable r = timed ?
// timed為true,通過poll方法指定獲取任務的超時時間(如果指定時間內沒有隊列依然為空,則返回)
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// timed為false,通過take方法無限期的等待阻塞隊列中加入新的任務
workQueue.take();
if (r != null) {
// 獲得了新的任務,getWork正常返回對應的任務對象
return r;
}else{
// 否則說明timed=true,且poll拉取任務時超時了
timedOut = true;
}
} catch (InterruptedException retry) {
// poll or take任務等待時worker線程被中斷了,捕獲中斷異常
// timeout = false,標識拉取任務時沒有超時
timedOut = false;
}
}
}
processWorkerExit(處理工作線程退出)
在runWorker中,如果getTask方法沒有拿到任務返回了null或者任務在執行時拋出了異常就會在最終的finally塊中調用processWorkerExit方法,令當前工作線程銷毀退出。
- processWorkerExit方法內會將當前線程占用的一些資源做清理,比如從workers中移除掉當前線程(利於Worker對象的GC),並令當前線程workerCount減一(completedAbruptly=true,說明是中斷導致的退出,getTask中沒來得及減workerCount,在這裡補正)
- completedAbruptly=true,說明是runWorker中任務異常導致的線程退出,無條件的通過addWorker重新創建一個新的工作線程代替當前退出的工作線程。
- completedAbruptly=false,在退出當前工作線程後,需要判斷一下退出後當前所存活的工作線程數量是否滿足要求。
比如allowCoreThreadTimeOut=false時,當前工作線程個數是否不低於corePoolSize等,如果不滿足要求則通過addWorker重新創建一個新的線程。
工作線程退出時所占用資源的回收
- processWorkerExit方法執行完畢後,當前工作線程就完整的從當前線程池中退出了(workers中沒有了引用,workerCount減1了),GC便會將記憶體中的Worker對象所占用的記憶體給回收掉。
- 同時runWorker中最後執行完processWorkerExit後,工作線程的run方法也return了,標識著整個線程正常退出了,操作系統層面上也會將線程轉為終止態並最終回收。至此,線程占用的所有資源就被徹底的回收乾凈了。
/**
* 處理worker線程退出
* @param myWorker 需要退出的工作線程對象
* @param completedAbruptly 是否是因為中斷異常的原因,而需要回收
* */
private void processWorkerExit(MyWorker myWorker, boolean completedAbruptly) {
if (completedAbruptly) {
// 如果completedAbruptly=true,說明是任務在run方法執行時出錯導致的線程退出
// 而正常退出時completedAbruptly=false,在getTask中已經將workerCount的值減少了
decrementWorkerCount();
}
ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 線程池全局總完成任務數累加上要退出的工作線程已完成的任務數
this.completedTaskCount += myWorker.completedTasks;
// workers集合中將當前工作線程剔除
workers.remove(myWorker);
// completedTaskCount是long類型的,workers是HashSet,
// 都是非線程安全的,所以在mainLock的保護進行修改
} finally {
mainLock.unlock();
}
int currentCtl = this.ctl.get();
if (!completedAbruptly) {
// completedAbruptly=false,說明不是因為中斷異常而退出的
// min標識當前線程池允許的最小線程數量
// 1 如果allowCoreThreadTimeOut為true,則核心線程也可以被銷毀,min=0
// 2 如果allowCoreThreadTimeOut為false,則min應該為所允許的核心線程個數,min=corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) {
// 如果min為0了,但工作隊列不為空,則修正min=1,因為至少需要一個工作線程來將工作隊列中的任務消費、處理掉
min = 1;
}
if (workerCountOf(currentCtl) >= min) {
// 如果當前工作線程數大於了min,當前線程數量是足夠的,直接返回(否則要執行下麵的addWorker恢復)
return;
}
}
// 兩種場景會走到這裡進行addWorker操作
// 1 completedAbruptly=true,說明線程是因為中斷異常而退出的,需要重新創建一個新的工作線程
// 2 completedAbruptly=false,且上面的workerCount<min,則說明當前工作線程數不夠,需要創建一個
// 為什麼參數core傳的是false呢?
// 因為completedAbruptly=true而中斷退出的線程,無論當前工作線程數是否大於核心線程,都需要創建一個新的線程來代替原有的被退出的線程
addWorker(null, false);
}
動態修改配置參數
ThreadPoolExecutor除了支持啟動前通過構造函數設置配置參數外,也允許線上程池運行的過程中動態的更改配置。而要實現動態的修改配置,麻煩程度要比啟動前靜態的指定大得多。
舉個例子,線上程池的運行過程中如果當前corePoolSize=20,且已經創建了20個核心線程時(workerCount=20),現在將corePoolSize減少為10或者增大為30時應該如何實時的生效呢?
下麵通過內嵌於代碼中的註釋,詳細的說明瞭allowCoreThreadTimeOut、corePoolSize、maximumPoolSize這三個關鍵配置參數實現動態修改的原理。
/**
* 設置是否允許核心線程idle超時後退出
* */
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0) {
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
}
// 判斷一下新舊值是否相等,避免無意義的volatile變數更新,導致不必要的cpu cache同步
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value) {
// 參數值value為true,說明之前不允許核心線程由於idle超時而退出
// 而此時更新為true說明現在允許了,則通過interruptIdleWorkers喚醒所有的idle線程
// 令其走一遍runWorker中的邏輯,嘗試著讓idle超時的核心線程及時銷毀
interruptIdleWorkers();
}
}
}
/**
* 動態更新核心線程最大值corePoolSize
* */
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) {
throw new IllegalArgumentException();
}
// 計算差異
int delta = corePoolSize - this.corePoolSize;
// 賦值
this.corePoolSize = corePoolSize;
if (workerCountOf(this.ctl.get()) > corePoolSize) {
// 更新完畢後,發現當前工作線程數超過了指定的值
// 喚醒所有idle線程,讓目前空閑的idle超時的線程在workerCount大於maximumPoolSize時及時銷毀
interruptIdleWorkers();
} else if (delta > 0) {
// 差異大於0,代表著新值大於舊值
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
// 我們無法確切的知道有多少新的線程是所需要的。
// 啟髮式的預先啟動足夠的新工作線程用於處理工作隊列中的任務
// 但當執行此操作時工作隊列為空了,則立即停止此操作(隊列為空了說明當前負載較低,再創建更多的工作線程是浪費資源)
// 取差異和當前工作隊列中的最小值為k
int k = Math.min(delta, workQueue.size());
// 嘗試著一直增加新的工作線程,直到和k相同
// 這樣設計的目的在於控制增加的核心線程數量,不要一下子創建過多核心線程
// 舉個例子:原來的corePoolSize是10,且工作線程數也是10,現在新值設置為了30,新值比舊值大20,理論上應該直接創建20個核心工作線程
// 而工作隊列中的任務數只有10,那麼這個時候直接創建20個新工作線程是沒必要的,只需要一個一個創建,在創建的過程中新的線程會儘量的消費工作隊列中的任務
// 這樣就可以以一種啟發性的方式創建合適的新工作線程,一定程度上節約資源。後面再有新的任務提交時,再從runWorker方法中去單獨創建核心線程(類似惰性創建)
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty()) {
// 其它工作線程在迴圈的過程中也在消費工作線程,且用戶也可能不斷地提交任務
// 這是一個動態的過程,但一旦發現當前工作隊列為空則立即結束
break;
}
}
}
}
/**
* 動態更新最大線程數maximumPoolSize
* */
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException();
}
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(this.ctl.get()) > maximumPoolSize) {
// 更新完畢後,發現當前工作線程數超過了指定的值
// 喚醒所有idle線程,讓目前空閑的idle超時的線程在workerCount大於maximumPoolSize時及時銷毀
interruptIdleWorkers();
}
}
目前為止,通過v1版本的MyThreadPoolExecutor源碼,已經將jdk線程池ThreadPoolExecutor在RUNNING狀態下提交任務,啟動工作線程執行任務相關的核心邏輯講解完畢了(不考慮優雅停止)。
jdk線程池預設支持的四種拒絕策略
jdk線程池支持用戶傳入自定義的拒絕策略處理器,只需要傳入實現了RejectedExecutionHandler介面的對象就行。
而jdk在ThreadPoolExecutor中提供了預設的四種拒絕策略方便用戶使用。
- AbortPolicy
拒絕接受任務時會拋出RejectedExecutionException,能讓提交任務的一方感知到異常的策略。適用於大多數場景,也是jdk預設的拒絕策略。 - DiscardPolicy
直接丟棄任務的拒絕策略。簡單的直接丟棄任務,適用於對任務執行成功率要求不高的場合 - DiscardOldestPolicy
丟棄當前工作隊列中最早入隊的任務,然後將當前任務重新提交。適用於後出現的任務能夠完全代替之前任務的場合(追求最終一致性) - CallerRunsPolicy
令調用者線程自己執行所提交任務的拒絕策略。線上程池壓力過大時,讓提交任務的線程自己執行該任務(非同步變同步),能有效地降低線程池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低。
上面介紹的四種jdk預設拒絕策略分別適應不同的業務場景,需要用戶仔細考慮最適合的拒絕策略。同時靈活的、基於介面的設計也開放的支持用戶去自己實現更貼合自己業務的拒絕策略處理器。
/**
* 預設的拒絕策略:AbortPolicy
* */
private static final MyRejectedExecutionHandler defaultHandler = new MyAbortPolicy();
/**
* 拋出RejectedExecutionException的拒絕策略
* 評價:能讓提交任務的一方感知到異常的策略,比較通用,也是jdk預設的拒絕策略
* */
public static class MyAbortPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 直接拋出異常
throw new RejectedExecutionException("Task " + command.toString() +
" rejected from " + executor.toString());
}
}
/**
* 令調用者線程自己執行command任務的拒絕策略
* 評價:線上程池壓力過大時,讓提交任務的線程自己執行該任務(非同步變同步),
* 能夠有效地降低線程池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低
* */
public static class MyCallerRunsPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果當前線程池不是shutdown狀態,則令調用者線程自己執行command任務
command.run();
}else{
// 如果已經是shutdown狀態了,就什麼也不做直接丟棄任務
}
}
}
/**
* 直接丟棄任務的拒絕策略
* 評價:簡單的直接丟棄任務,適用於對任務執行成功率要求不高的場合
* */
public static class MyDiscardPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 什麼也不做的,直接返回
// 效果就是command任務被無聲無息的丟棄了,沒有異常
}
}
/**
* 丟棄當前工作隊列中最早入隊的任務,然後將當前任務重新提交
* 評價:適用於後出現的任務能夠完全代替之前任務的場合(追求最終一致性)
* */
public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果當前線程池不是shutdown狀態,則丟棄當前工作隊列中最早入隊的任務,然後將當前任務重新提交
executor.getQueue().poll();
executor.execute(command);
}else{
// 如果已經是shutdown狀態了,就什麼也不做直接丟棄任務
}
}
}
jdk預設的四種線程池實現
jdk中除了提供了預設的拒絕策略,還在Executors類中提供了四種基於ThreadPoolExecutor的、比較常用的線程池,以簡化用戶對線程池的使用。
這四種線程池可以通過Executors提供的public方法來分別創建:
newFixedThreadPool
newFixedThreadPool方法創建一個工作線程數量固定的線程池,其創建Thr