1.創建線程池相關參數 線程池的創建要用ThreadPoolExecutor類的構造方法自定義創建,禁止用Executors的靜態方法創建線程池,防止記憶體溢出和創建過多線程消耗資源。 corePoolSize: 線程池核心線程數量,不會自動銷毀,除非設置了參數allowCoreThreadTimeO ...
1.創建線程池相關參數
線程池的創建要用ThreadPoolExecutor類的構造方法自定義創建,禁止用Executors的靜態方法創建線程池,防止記憶體溢出和創建過多線程消耗資源。
corePoolSize: 線程池核心線程數量,不會自動銷毀,除非設置了參數allowCoreThreadTimeOut=true,那麼即使當線程數量小於corePoolSize的時候,當線程
空閑時間大於keepAliveTime,也會被回收
maximumPoolSize: 線程池能容納的最大線程數量
keepAliveTime: 一般情況下核心線程不可回收,非核心線程空閑時間大於此時間會被回收
unit: keepAliveTime單位
workQueue: 工作隊列,當線程池數量等於corePoolSize的時候,此時任務會先進入到隊列,其他線程執行完任務後會從該隊列獲取任務繼續執行
threadFactory: 線程工廠,用來創建線程池的線程同時也可以指定線程的名字以及其他屬性
handler: 當線程池線程已滿執行拒絕策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.核心方法源碼分析
2.1 execute詳解
// 線程池執行核心方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 當線程池線程數量小於corePoolSize時,直接嘗試添加一個線程並把command當做這個線程的第一個任務
if (workerCountOf(c) < corePoolSize) {
// 若添加失敗,則線程池數量不符合或者線程池狀態發生變化此時繼續往下執行
if (addWorker(command, true))
return;
c = ctl.get();
}
// 若線程池狀態為Running則把任務添加到阻塞隊列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 雙重校驗線程池狀態,因為線程池狀態可能在任務剛添加進隊列的時候就發生改變,此時需要從隊列中移除該任務並執行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 到這表明線程池此時狀態時Running,然後判斷線程池線程數量,使其至少有一個線程能夠執行任務
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 隊列已滿添加失敗,嘗試添加線程執行任務若添加失敗(線程池線程數量已達到maximumPoolSize或者線程池狀態shutdown等)則執行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
2.2 addWorker詳解
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 線程狀態
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 第二個與條件分解 rs >= SHUTDOWN && (rs > SHUTDOWN || firstTask != null || workQueue.isEmpty())
// 有三種情況線程池不增加線程
// 1.線程池狀態STOP / TIDYING / TERMINATED
// 2.線程池狀態SHUTDOWN但此時已不接受新提交到線程池的任務
// 3.線程池狀態SHUTDOWN此時工作隊列無任務 (由此可見在SHUTDOWN狀態下只要隊列中還存在任務那麼線程池還會增加線程處理)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 根據增加核心線程還是非核心線程 判斷線程池中線程數量是否符合
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 線程數量+ 1 跳出迴圈
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創建Worker工作者
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 線程池狀態為RUNNING或者 線程池狀態為SHUTDOWN但沒有新提交任務(此時增加線程是為了處理阻塞隊列中的任務)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 線程不可能已經激活
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動線程開始執行任務 Worker$run()方法
t.start();
workerStarted = true;
}
}
} finally {
// 啟動失敗 workers移除線程數量-1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.3 runworker詳解
// 上一個方法t.start()運行後就會執行這個方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// state = 0, 使得線程可中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 執行線程自帶的第一個任務或者從任務隊列中獲取的任務
while (task != null || (task = getTask()) != null) {
// 獲取獨占鎖 開始運行任務
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 若線程池狀態為STOP時,確保線程有設置中斷狀態,若線程池狀態為RUNING和SHUTDOWN,則會清除線程的中斷狀態
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置鉤子方法可重寫
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 運行任務
// 由上所知線程池為STOP會給線程設置中斷狀態,若任務代碼邏輯有對中斷的相關處理可能會直接拋出中斷異常
// shutdownNow()方法會讓線程池放棄隊列中以及正在運行中的任務(若任務中沒有對中斷進行處理則會繼續運行)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 後置鉤子方法可重寫
afterExecute(task, thrown);
}
} finally {
// 清空任務 worker完成任務數+1
task = null;
w.completedTasks++;
// 解鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
2.4 getTask詳解
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
// 是否超時 keepAliveTime
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池狀態為STOP或者為SHUTDOWN並且任務隊列為空則不在執行任務,直接回收線程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 迴圈cas直到成功
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 線程是否空閑回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 線程池線程數超過最大限制或者當前線程空閑時間已經超過keepAliveTime
// 並且線程池線程數大於1或者隊列為空那麼代表此線程可以回收
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// cas設置線程數減1 cas失敗表示線程池狀態變化或者其他線程先一步回收使得線程池線程已經減1了
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 是否空閑回收, poll限制從隊列獲取任務超時返回
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 未取到說明線程已空閑keepAliveTime時間 超時可回收
timedOut = true;
} catch (InterruptedException retry) {
// 若線程使用take()或poll()且隊列中沒有任務,則當調用shutdown()/shutdownNow()時會給線程設置中斷狀態
// 此時會拋出中斷異常、並且線程池狀態可能已經發生了變化此時開始下一輪迴圈
timedOut = false;
}
}
}
2.5 processWorkerExit詳解
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// run方法異常 線程數量直接-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//嘗試終止線程池,設置最終狀態
tryTerminate();
int c = ctl.get();
// 線程池狀態為RUNNING或SHUTDONW
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// min為線程池最小不可回收線程數
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 增加一個線程
addWorker(null, false);
}
}
2.6 tryTerminate詳解
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 三種情況什麼都不做直接返回
// 1.線程池狀態還為RUNNING
// 2.線程池狀態為TIDYING或TERMINATED已終止
// 3.線程池狀態為SHUTDOWN但阻塞隊列中還有任務
// 當狀態為STOP或者為SHUTDOW且隊列為空才會往下執行
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 線程池是否還存線上程 嘗試中斷一個線程,進行傳播
// 如何理解? 當一個線程從等待中中斷後,getTask()返回null,後續會執行processWorkerExit,
// 而processWorkerExit里的tryTerminate會再次嘗試終止線程池或再中斷一個線程以達到傳播的目的,妙哉
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 到這裡說明線程池裡的線程都已經回收了,可以嘗試終止線程池了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 線程池設置終止狀態了
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
2.7 interruptIdleWorkers詳解
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.(向其他正在等待的線程傳遞SHUTDOWN關閉信號)
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 線程非中斷並且嘗試獲取到了worker的鎖則給線程設置中斷狀態
// w.tryLock() = true,說明線程t沒有執行任務空閑,可能在getTask()中阻塞等待任務
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
2.8 shutdown詳解
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
// 嘗試停止線程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 線程池狀態設置為SHUTDOWN
advanceRunState(SHUTDOWN);
// 中斷所有空閑的線程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
2.9 shutdownNow詳解
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
// 取消執行隊列中的和正在執行中的任務
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 線程池狀態設置為STOP
advanceRunState(STOP);
// 中斷線程池中的所有線程,即使是在執行任務中的線程
interruptWorkers();
// 獲取隊列中的任務並且清空隊列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試終止線程池,若線程池為空則終止完成,否則依賴傳播來終止線程池
tryTerminate();
return tasks;
}
3.問題
3.1 線程池被 shutdown 後,還能產生新的線程?
線程池被shutdown後在阻塞隊列還有任務的情況下會產生新的線程,見addWorker方法詳解
3.2 線程把任務丟給線程池後肯定就馬上返回了?
線程把任務丟給線程池後不是立即返回,當線程池線程數量小於核心線程數的時候會接著創建一個線程並直接運行這個提交到線程池的任務,當線程池線程數量等於核心線程數會將任務添加到阻塞隊列(若添加失敗則使用拒絕策略,添加成功會判斷當前線程池是否有線程存在,不存在則創建一個線程),若任務隊列已滿則嘗試創建一個非核心線程運行此任務。參考execute方法
3.3 線程池裡的線程異常後會再次新增線程嗎,如何捕獲這些線程拋出的異常?
線程池裡的線程運行異常會拋出異常且會將該worker移除,若線程池狀態不為STOP則會再次新增線程。
1.在run方法中捕獲
2.通過submit提交任務獲取future,之後調用get()方法若線程有拋出異常則會捕獲到
3.重寫afterExecute()方法,此方法會以runnable和throwable為入參
4.通過Thread的uncaughtExceptionHander處理
3.4 線程池的大小如何設置,如何動態設置線程池的參數
setMaximumPoolSize 若當前線程池線程數量大於設置的maximumPoolSize 會嘗試中斷線程,達到回收多餘線程的目的
setCorePoolSize 若當前線程池線程數量大於設置的corePoolSize會嘗試中斷線程,若設置值大於原先的corePoolSize則會根據隊列中的任務創建合適數量的線程來執行任務
3.5 阿裡 Java 代碼規範為什麼不允許使用 Executors 快速創建線程池?
newFixedThreadPool和newSingleThreadExecutor創建workQueue LinkedBlockingQueue未聲明大小相當於創建無界隊列,若任務數量過多添加到隊列中可能會導致OOM
newCachedThreadPool和newScheduledThreadPool最大線程數設置為Integer.MAX_VALUE也可能導致OOM
3.6 如何優雅關閉線程池
調用shutdown()或shutdownNow()方法關閉線程池後使用awaitTermination方法等待線程池線程和隊列任務清空變為TERMINATED狀態
3.7 使用線程池應該避免哪些問題,能否簡單說下線程池的最佳實踐?
1.線程池執行的任務應該是相互獨立的,如果都在一個線程池裡執行可能會導致死鎖
2.核心任務與非核心任務最好能用多個線程池隔離開來,非核心任務可能過多導致核心任務堆積在隊列中無法及時執行,影響業務
3.線程池各個參數很難一次性確定,可以添加告警,比如三分鐘內隊列任務數都是滿的情況下觸發告警,支持動態調整修改線程池的核心線程數和最大線程數。
4. 結語
本篇文章簡要分析了線程池ThreadPoolExecutor中比較重要的幾個方法以及對幾個常見問題的理解,如果理解的有問題歡迎讀者大佬指出討論,謝謝~