上一篇從整體上介紹了 介面,從上一篇我們知道了 框架的最頂層實現是 類, 工廠類中提供的 、`newFixedThreadPool newCachedThreadPool ThreadPoolExecutor ThreadPoolExecutor`線程池的運行過程。 1.線程池狀態 既然要講運行過程
上一篇從整體上介紹了Executor
介面,從上一篇我們知道了Executor
框架的最頂層實現是ThreadPoolExecutor
類,Executors
工廠類中提供的newScheduledThreadPool
、newFixedThreadPool
、newCachedThreadPool
方法其實也只是ThreadPoolExecutor
的構造函數參數不同而已。通過傳入不同的參數,就可以構造出適用於不同應用場景下的線程池,那麼它的底層原理是怎樣實現的呢,這篇就來介紹下ThreadPoolExecutor
線程池的運行過程。
1.線程池狀態
既然要講運行過程,那麼首先要瞭解下線程池的狀態分為哪些?
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
ThreadPoolExecutor
代碼中定義了上面幾個變數:定義了一個volatile變數runState,以及其他幾個表示狀態的常量。
runState
:初始狀態,表示當前線程池的運行狀態,它的值就是上面的那4個常量值之一
RUNNING
:線程池接受新任務並執行隊列任務中...
SHUTDOWN
:不再接受新任務,但是會繼續執行等待隊列Queued中的任務。當調用了shutdown()方法,會從 RUNNING -> SHUTDOWN
STOP
:不再接受新任務,同時也不執行等待隊列Queued中的任務,並且會嘗試終止正在執行中的任務。當調用了shutdownNow()方法, 會從(RUNNING or SHUTDOWN) -> STOP
TERMINATED
:線程池中所有線程已經停止運行,其他行為同 STOP狀態。
- 當等待隊列和線程池為空時,會從SHUTDOWN -> TERMINATED
- 當線程池為空時,會從STOP -> TERMINATED
2.線程池運行任務
2.1變數介紹
在講解運行過程前,我們先看下ThreadPoolExecutor
中的幾個比較重要的成員變數:
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來保存等待中的任務,等待worker線程空閑時執行任務
private final ReentrantLock mainLock = new ReentrantLock(); //更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 時需要持有這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來保存工作中的執行線程
private volatile long keepAliveTime; //超過corePoolSize外的線程空閑存活之間
private volatile boolean allowCoreThreadTimeOut; //是否對corePoolSize內的線程設置空閑存活時間
private volatile int corePoolSize; //核心線程數
private volatile int maximumPoolSize; //最大線程數(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int poolSize; //線程池中的當前線程數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //線程工廠,用來新建線程
private int largestPoolSize; //記錄線程池中出現過的最大線程數大小
private long completedTaskCount; //已經執行完的線程數
這邊重點解釋下 corePoolSize
、maximumPoolSize
、workQueue
兩個變數,這兩個變數涉及到線程池中創建線程個數的一個策略。
corePoolSize
: 這個變數我們可以理解為線程池的核心大小,舉個例子來說明(corePoolSize假設等於10,maximumPoolSize等於20):
- 有一個部門,其中有10(corePoolSize)名工人,當有新任務來了後,領導就分配任務給工人去做,每個工人只能做一個任務。
- 當10個工人都在忙時,新來的任務就要放到隊列(workQueue)中等待。
- 當任務越積累越多,遠遠超過工人做任務的速度時,領導就想了一個辦法:從其他部門借10個工人來,借的數量有一個公式(maximumPoolSize - corePoolSize)來計算。然後把新來的任務分配給借來的工人來做。
- 但是如果速度還是還不急的話,可能就要採取措施來放棄一些任務了(RejectedExecutionHandler)。
等到一定時間後,任務都完成了,工人比較閑的情況下,就考慮把借來的10個工人還回去(根據keepAliveTime判斷) - 也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。
2.2線程執行過程
先看下前一篇文章中的一個例子:
ExecutorService executor = Executors.newFixedThreadPool(3);
IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
}));
上面代碼就是新建6個任務,然後扔到線程池中運行,輸出線程名稱,直到運行完畢。其中最核心的方法就是execute()
方法,雖然submit()
也可以執行任務,但它底層也是調用execute()
方法,所以懂了execute()
的實現原理即可:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //1.
if (runState == RUNNING && workQueue.offer(command)) { //2.
if (runState != RUNNING || poolSize == 0) //3.
ensureQueuedTaskHandled(command); //4.
}
else if (!addIfUnderMaximumPoolSize(command)) //5.
reject(command); // is shutdown or saturated //6
}
}
上面的代碼看起來邏輯有點複雜,我們一個一個看,首先看上面1位置處:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
是一個或表達式,它分成兩部分
- 首先判斷當前線程數是否大於等於核心線程數,是的話直接進入if語句塊中,否則判斷第二個部分
- 第二個部分
addIfUnderCorePoolSize(command)
,這個方法是當線程數小於核心線程數時,用來新建線程執行任務(因為線程數小於corePoolSize時,直接新建線程來運行任務,不管當前線程池裡有沒有空閑的線程)。如果新建失敗,那麼進入if語句塊,成功了那麼execute方法就執行結束了,因為線程已經新建成功了,任務已經開始線上程池中運行。
進入if語句塊後,看上面代碼2.if (runState == RUNNING && workQueue.offer(command))
- 判斷當前線程池狀態是否是RUNNING 而且 任務放入等待隊列中成功,那麼直接進入if語句塊
- 否則到代碼5.處
if (!addIfUnderMaximumPoolSize(command))
,判斷新任務用新線程執行是否成功(註:這裡的新線程就是我們上面講的 “借來的工人” maximumPoolSize) - 如果“借來的工人”還是處理不了的話,執行任務拒絕策略
繼續進到代碼塊3 的if語句塊if (runState != RUNNING || poolSize == 0)
, 因為新任務加入到等待隊列中了,這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是的話,應急處理加入的新任務 ensureQueuedTaskHandled(command)
。
我們看下兩個關鍵方法的實現:
##### 1.addIfUnderCorePoolSize
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
首先獲取鎖,因為涉及到線程池狀態的變化。然後再次判斷 if (poolSize < corePoolSize && runState == RUNNING)
,在execute()方法中我們已經判斷過一次,這邊再次判斷是為了防止其他線程又新增了新線程或者調用了shutdown、shutdownNow方法,這邊起到了雙重檢查的一個效果。如果為true
的話,進行t = addThread(firstTask)
新增線程執行任務。addThread方法裡面比較簡單,就是通過線程工廠創建線程thread,然後封裝到Worker對象中,加入到 workers隊列中,並執行線程,可以把Worker對象看成是擁有一個線程的對象。
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
boolean workerStarted = false;
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
try {
t.start();
workerStarted = true;
}
}
return t;
}
這裡在介紹下Worker對象, 它實現了Runnable介面,你把它當成Runnable的一個代理類即可,最終也是執行它的run方法。只要註意一下Worker中的beforeExecute
和afterExecute
方法,這兩個方法在ThreadPoolExecutor中沒有具體實現,用戶可以重寫這個方法和後面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等,而afterExecute方法還有一個Throwable t
參數,用戶可以用來記錄一些異常信息,因為新線程中的異常時捕獲不到的,需要在afterExecute中記錄。
看起來這個是不是和spring 切麵有點像,可以看到 知識都是相通的。
看一下它的run方法:
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) { //1
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
註意代碼塊1,可以看到這邊在迴圈獲取任務,並執行,直到任務全部執行完畢。除了第一個任務,其他任務都是通過getTask()
方法去取,這個方法是ThreadPoolExecutor中的一個方法。我們猜一下,整個類中只有任務緩存隊列中保存了任務,應該就是去緩存隊列中取了。
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll(); //取任務
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大於核心池大小或者允許為核心池線程設置空閑時間,
//則通過poll取任務,若等待一定的時間取不到任務,則返回null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) { //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers(); //中斷處於空閑狀態的worker
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
這裡有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給 空閑線程執行。但是在這裡,並沒有採用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和複雜度,這裡直接讓執行完任務的線程Worker去任務緩存隊列裡面取任務來執行,因為每一個Worker裡面都包含了一個線程thread。
2. addIfUnderMaximumPoolSize
這個方法的實現思想和 addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是線上程 池中的線程數達到了核心池大小並且往任務隊列中添加任務失敗的情況下執行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
到這裡,大部分朋友應該對任務提交給線程池之後到被執行的整個過程有了一個基本的瞭解,下麵總結一下:
- 首先,要清楚corePoolSize和maximumPoolSize的含義;
- 其次,要知道Worker是用來起到什麼作用的;
- 要知道任務提交給線程池之後的處理策略,這裡總結一下主要有4點:
- 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
- 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
- 如果當前線程池中的線程數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
- 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於 corePoolSize;如果允許為核心池中的線程設置存活時間,那麼核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
這篇寫完了,後面會介紹一下任務緩存隊列的種類已經緩存的策略以及任務拒絕策略等。如果文章有什麼問題,歡迎大家指正,大家互相溝通,互相學習。