線程池ThreadPoolExecutor ThreadPoolExecutor 繼承結構 繼承結構如圖所示:ThreadPoolExecutor <- AbstractExecutorService <- ExecutorService <- Executor public class Threa ...
線程池ThreadPoolExecutor
ThreadPoolExecutor 繼承結構
繼承結構如圖所示:ThreadPoolExecutor <- AbstractExecutorService <- ExecutorService <- Executor
![](https://typora-pictures-1316842725.cos.ap-shanghai.myqcloud.com/images/20230410235937.png)
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
}
/**
* 實現了部分 ExecutorService 方法
* 1. submit 方法
* 2. invokeAny 方法
* 3. invokeAll 方法
*/
public abstract class AbstractExecutorService implements ExecutorService {
/**
* Callable -> FutureTask
* FutureTask<V> implements RunnableFuture<V>
* RunnableFuture<V> extends Future<V>, Runnable
*
* FutureTask Status:
* NEW(0): 初始狀態, 任務剛被創建或者正在計算中
* COMPLETING(1): 中間狀態, 任務計算完成正在對結果進行賦值,或者正在處理異常
* NORMAL(2): 終止狀態, 任務計算完成, 結果已經完成賦值
* EXCEPTIONAL(3): 終止狀態, 任務計算過程發生異常無法處理,線程中斷
* CANCELLED(4): 終止狀態, 任務計算過程被取消
* INTERRUPTING(5): 中間狀態, 任務計算過程已開始並被中斷,正在修改狀態
* INTERRUPTED(6): 終止狀態,任務計算過程已開始並被中斷,且已經完全停止
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
// 提交 callable 任務
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 提交 runnable 任務,返回 null
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 提交 runnable 任務,返回 result
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// invokeAll
// 為每一個任務創建對應的FutureTask, 並調用 execute 方法執行
// execute() 方法在 ThreadPoolExecutor 被實現
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
// 如何任務此時還未執行完成,則阻塞獲取對應的值
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
// 執行過程拋出無法處理的異常
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
// 取消任務的執行,如果任務已經執行完成,則不受影響
futures.get(i).cancel(true);
}
}
// InvokeAny 方法邏輯待後續更新
}
/**
* 在 Executor 的基礎上定義了一系列任務執行和線程池管理方法
*
* 1. submit: 提供方法執行帶有返回值的任務
* 2. invokeAll: 提供方法執行指定的任務集合中的所有任務, 返回 List<Future<T>>
* 3. invokeAny: 提供方法執行指定的任務集合中的所有任務, 將第一個執行完成的任務的結果作為返回值, 並終止其他線程的執行
* 4. isShutDown/isTerminated: 判斷線程池狀態方法
* 5. shutdown: 不再接受新的任務, 待所有任務執行完畢後關閉線程池
* 6. shutdownNow: 不再接受新的任務,直接關閉線程池
*/
public interface ExecutorService extends Executor {
// ...
}
/**
* 只定義了一個 execute 方法, 執行 Runnable 任務
*/
public interface Executor {
void execute(Runnable command);
}
ThreadPoolExecutor 關鍵參數及核心方法
關鍵參數
線程池狀態參數
public class ThreadPoolExecutor extends AbstractExecutorService {
// 線程池狀態,由兩部分構造 runState | workerCount
// runState: 占2bit(29~30位)
// workerCount: 占29bit(0~28位)
// 符號位: 占1bit(最高位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// workerCount 最大容量: 2^29 - 1
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 線程池狀態
* RUNNING: 運行狀態,接受新任務,處理阻塞隊列中的任務
* SHUTDOWN: 關閉狀態,拒絕新任務,處理阻塞隊列中的任務
* STOP: 停止狀態,拒絕新任務,並中斷當前正在執行的任務,不處理阻塞隊列中的任務直接關閉
* TIDYING: 過度狀態,當前線程池中的活動線程數降為0時的狀態
* TERMINATED: 銷毀狀態,線程池徹底終止
*/
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
}
線程池狀態轉移圖如下所示
![](https://typora-pictures-1316842725.cos.ap-shanghai.myqcloud.com/images/20230411172803.png)
- RUNNING: 線程池創建後進入的狀態
- SHUTDOWN: 調用
shutdown
方法進入該狀態,該方法主要包含如下操作- 更新線程池狀態為
SHUTDOWN
- 中斷空閑線程
interruptIdleWorkers()
- 所以已經存在任務隊列中的任務還是能被正常執行完成
- 執行完所有任務後,先清除所有的worker,然後調用
tryTerminate()
,進入TIDYING
狀態
- 更新線程池狀態為
- STOP: 調用
shutdownNow()
方法進入該狀態,該方法主要包含如下操作- 更新線程池狀態為
STOP
- 中斷所有線程
interruptWorkers()
- 清空任務隊列
drainQueue()
- 立即調用
tryTerminate()
進入TIDYING
狀態
- 更新線程池狀態為
- TIDYING: 調用
terminated()
方法 - TERMINATED: 執行完
terminated()
方法進入該狀態- ctl.set(ctlOf(TERMINATED, 0))
線程池管理參數
public class ThreadPoolExecutor extends AbstractExecutorService {
// 任務隊列
private final BlockingQueue<Runnable> workQueue;
// 工作線程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 線程池到達過的最大線程數量
private int largestPoolSize;
// 已完成任務數
private long completedTaskCount;
// 線程工廠,用於創建線程
private volatile ThreadFactory threadFactory;
// 拒絕策略處理類
private volatile RejectedExecutionHandler handler;
// 線程池中線程數量 > corePoolSize 情況下,空閑線程的最大存活時間
private volatile long keepAliveTime;
// true: 線程數量 <= corePoolSize 情況下,空閑線程的最大存活時間也設置為 keepAliveTime
// false(default): 線程數量 <= corePoolSize 情況下,空閑線程可以一直存活
private volatile boolean allowCoreThreadTimeOut;
// 設置線程池 —— 核心線程數
private volatile int corePoolSize;
// 設置線程池 —— 最大線程數
private volatile int maximumPoolSize;
// 預設任務拒絕策略: 拋出異常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
}
核心方法
構造函數
// corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue 必須手動設置
// threadFactory, handler 可以使用預設設置
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
execute() 方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// workerCount < corePoolSize,則直接添加一個 worker 執行該任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// workerCount >= corePoolSize, 則先嘗試將任務添加到 workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 任務添加到 workQueue 後,執行recheck
// 如果線程池未處於 Running 狀態,則將剛剛添加的任務從阻塞隊列中刪除
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果線程池處於 Running 狀態,則判斷是否需要添加一個新的 worker
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// workerCount >= corePoolSize, 並且任務隊列已滿,添加失敗
// 則嘗試增加一個新的 worker 執行該任務
// 如果添加失敗,則調用拒絕策略處理類
else if (!addWorker(command, false))
reject(command);
}
execute
提交新任務的處理策略總結如下:
workerCount < corePoolSize
: 直接添加一個新的 worker 執行任務workerCount >= corePoolSize
: 嘗試添加到任務隊列- 添加成功則執行
recheck
; - 添加失敗則嘗試創建一個新的 worker 來執行該任務,創建worker失敗則調用拒絕策略處理
- 添加成功則執行
addWorker() 方法
該方法用於添加一個新的 Worker 到線程池中,包括兩個參數:
- firstTask(Runnable): 創建完成後第一個執行的任務
- core(boolean):
- true: 使用 corePoolSize 為最大線程數量
- false: 使用 maxPoolSize 為最大線程數量
private boolean addWorker(Runnable firstTask, boolean core) {
// 迴圈標簽,方便跳出
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 判斷線程池狀態:以下狀態才能添加 worker
* 1. 線程池處於 RUNNING 狀態
* 2. 線程池處於 SHUTDOWN 狀態 且 firstTask 為 null 且 workQueue 不為空
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 判斷當前 worker 數量是否還能繼續添加
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 更新 workerCount
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS 更新失敗則自旋重試
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
// worker 啟動標識
boolean workerStarted = false;
// worker 加入 HashSet 集合標識
boolean workerAdded = false;
Worker w = null;
try {
// Worker構造方法調用 threadFactory 創建新的線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加鎖,保證多個線程同時添加 worker 到集合中的安全性
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 判斷該線程是否已經啟動
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 啟動線程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// worker 啟動失敗,則做一些回退處理
// 從 workers 集合中刪除 worker
// workCount 減少1
addWorkerFailed(w);
}
return workerStarted;
}
Worker
Worker
類實現了Runnable
介面,所以在創建線程中可以傳入自己作為任務,然後線程啟動時調用自己的run()
方法
Worker
類繼承自AQS,所以其本身也是一把鎖(不可重入鎖),在執行任務時通過lock()
鎖住自己,保證worker正在執行時不會去獲取其他任務來執行
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 傳入自己作為 Runnable 實例
// 線程啟動時執行 Worker.run() 方法
this.thread = getThreadFactory().newThread(this);
}
// run() 則調用外部 ThreadPoolExecutor 的 runWorker 方法
public void run() {
runWorker(this);
}
}
runWorker() 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 初始任務
Runnable task = w.firstTask;
// firstTask 執行過一次後被置為 null
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 迴圈獲取任務執行,復用已有線程
// getTask() 從任務隊列獲取task
while (task != null || (task = getTask()) != null) {
w.lock();
// 若線程池處於 STOP 狀態,但線程沒有中斷執行,則調用 interrupt() 方法完成中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 鉤子方法,任務執行前邏輯
// 預設實現為空,可自定義線程池擴展該功能
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 鉤子方法,任務執行後邏輯
// 預設實現為空,可自定義線程池擴展該功能
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 刪除 worker,線程執行完畢
processWorkerExit(w, completedAbruptly);
}
}
getTask() 方法
從
workQueue
中獲取任務,返回 Runnable 任務或者 null
return Runnable
: worker正常執行return null
: 獲取不到任務,進入 processWorkerExit 結束當前 worker
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 判斷是否回收當前線程:
* 情況1. 線程池狀態為 SHUTDOWN && workQueue 為空
* 情況2. 線程池狀態為 STOP || TERMINATED
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// true: poll()獲取任務,阻塞獲取,設置超時時間
// false: take()獲取任務,阻塞獲取
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 判斷是否回收當前線程:
* 條件1. workerCount > maxPoolSize 或 當前線程獲取任務超時
* 條件2. workerCount > 1 或 workQueue 為空
*
* 同時滿足條件1和條件2,則CAS減少workerCount,並返回null
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 不滿足回收當前線程的條件,則執行後續獲取任務的邏輯
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit() 方法
從 workers 工作線程集合中刪除當前 worker,回收線程。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是異常退出,則需要手動完成 workerCount 的更新
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1.如果是異常退出則直接添加一個新的 worker
// 2.如果 workerCount < 最小線程數要求,則添加一個新的 worker
addWorker(null, false);
}
}
總結
創建線程池提交任務,整體執行流程如下圖所示:
- execute(): 提交 Runnable Task
- submit(): 提交 Callable Task
- wc: workerCount, 線程數量
- rs: runState, 線程池運行狀態
- reject: 執行任務拒絕策略