線程池無論是工作還是面試都是必備的技能,但是很多人對於線程池的實現原理卻一知半解,並不瞭解線程池內部的工作原理,今天一燈就帶大家一塊剖析線程池底層實現原理。 ...
線程池無論是工作還是面試都是必備的技能,但是很多人對於線程池的實現原理卻一知半解,並不瞭解線程池內部的工作原理,今天一燈就帶大家一塊剖析線程池底層實現原理。
1. 為什麼要使用線程池
使用線程池通常由以下兩個原因:
-
頻繁創建銷毀線程需要消耗系統資源,使用線程池可以復用線程。
-
使用線程池可以更容易管理線程,線程池可以動態管理線程個數、具有阻塞隊列、定時周期執行任務、環境隔離等。
2. 線程池的使用
/**
* @author 一燈架構
* @apiNote 線程池示例
**/
public class ThreadPoolDemo {
public static void main(String[] args) {
// 1. 創建線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
3,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// 2. 往線程池中提交3個任務
for (int i = 0; i < 3; i++) {
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 關註公眾號:一燈架構");
});
}
// 3. 關閉線程池
threadPoolExecutor.shutdown();
}
}
輸出結果:
pool-1-thread-2 關註公眾號:一燈架構
pool-1-thread-1 關註公眾號:一燈架構
pool-1-thread-3 關註公眾號:一燈架構
線程池的使用非常簡單:
- 調用new ThreadPoolExecutor()構造方法,指定核心參數,創建線程池。
- 調用execute()方法提交Runnable任務
- 使用結束後,調用shutdown()方法,關閉線程池。
再看一下線程池構造方法中核心參數的作用。
3. 線程池核心參數
線程池共有七大核心參數:
參數名稱 | 參數含義 |
---|---|
int corePoolSize | 核心線程數 |
int maximumPoolSize | 最大線程數 |
long keepAliveTime | 線程存活時間 |
TimeUnit unit | 時間單位 |
BlockingQueue |
阻塞隊列 |
ThreadFactory threadFactory | 線程創建工廠 |
RejectedExecutionHandler handler | 拒絕策略 |
-
corePoolSize 核心線程數
當往線程池中提交任務,會創建線程去處理任務,直到線程數達到corePoolSize,才會往阻塞隊列中添加任務。預設情況下,空閑的核心線程並不會被回收,除非配置了allowCoreThreadTimeOut=true。
-
maximumPoolSize 最大線程數
當線程池中的線程數達到corePoolSize,阻塞隊列又滿了之後,才會繼續創建線程,直到達到maximumPoolSize,另外空閑的非核心線程會被回收。
-
keepAliveTime 線程存活時間
非核心線程的空閑時間達到了keepAliveTime,將會被回收。
-
TimeUnit 時間單位
線程存活時間的單位,預設是TimeUnit.MILLISECONDS(毫秒),可選擇的有:
TimeUnit.NANOSECONDS(納秒)
TimeUnit.MICROSECONDS(微秒)
TimeUnit.MILLISECONDS(毫秒)
TimeUnit.SECONDS(秒)
TimeUnit.MINUTES(分鐘)
TimeUnit.HOURS(小時)
TimeUnit.DAYS(天)
-
workQueue 阻塞隊列
當線程池中的線程數達到corePoolSize,再提交的任務就會放到阻塞隊列的等待,預設使用的是LinkedBlockingQueue,可選擇的有:
LinkedBlockingQueue(基於鏈表實現的阻塞隊列)
ArrayBlockingQueue(基於數組實現的阻塞隊列)
SynchronousQueue(只有一個元素的阻塞隊列)
PriorityBlockingQueue(實現了優先順序的阻塞隊列)
DelayQueue(實現了延遲功能的阻塞隊列)
-
threadFactory 線程創建工廠
用來創建線程的工廠,預設的是Executors.defaultThreadFactory(),可選擇的還有Executors.privilegedThreadFactory()實現了線程優先順序。當然也可以自定義線程創建工廠,創建線程的時候最好指定線程名稱,便於排查問題。
-
RejectedExecutionHandler 拒絕策略
當線程池中的線程數達到maximumPoolSize,阻塞隊列也滿了之後,再往線程池中提交任務,就會觸發執行拒絕策略,預設的是AbortPolicy(直接終止,拋出異常),可選擇的有:
AbortPolicy(直接終止,拋出異常)
DiscardPolicy(默默丟棄,不拋出異常)
DiscardOldestPolicy(丟棄隊列中最舊的任務,執行當前任務)
CallerRunsPolicy(返回給調用者執行)
4. 線程池工作原理
線程池的工作原理,簡單理解如下:
- 當往線程池中提交任務的時候,會先判斷線程池中線程數是否是核心線程數,如果小於,會創建核心線程並執行任務。
- 如果線程數大於核心線程數,會判斷阻塞隊列是否已滿,如果沒有滿,會把任務添加到阻塞隊列中等待調度執行。
- 如果阻塞隊列已滿,會判斷線程數是否小於最大線程數,如果小於,會繼續創建最大線程數並執行任務。
- 如果線程數大於最大線程數,會執行拒絕策略,然後結束。
5. 線程池源碼剖析
5.1 線程池的屬性
public class ThreadPoolExecutor extends AbstractExecutorService {
// 線程池的控制狀態,Integer長度是32位,前3位用來存儲線程池狀態,後29位用來存儲線程數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 線程個數所占的位數
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池的最大容量,2^29-1,約5億個線程
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 獨占鎖,用來控制多線程下的併發操作
private final ReentrantLock mainLock = new ReentrantLock();
// 工作線程的集合
private final HashSet<Worker> workers = new HashSet<>();
// 等待條件,用來響應中斷
private final Condition termination = mainLock.newCondition();
// 是否允許回收核心線程
private volatile boolean allowCoreThreadTimeOut;
// 線程數的歷史峰值
private int largestPoolSize;
/**
* 以下是線程池的七大核心參數
*/
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
}
線程池的控制狀態ctl用來存儲線程池狀態和線程個數,前3位用來存儲線程池狀態,後29位用來存儲線程數量。
設計者多聰明,用一個變數存儲了兩塊內容。
5.2 線程池狀態
線程池共有5種狀態:
狀態名稱 | 狀態值 | 狀態含義 | 狀態作用 |
---|---|---|---|
RUNNING | 111 | 運行中 | 線程池創建後預設狀態,接收新任務,並處理阻塞隊列中的任務。 |
SHUTDOWN | 000 | 已關閉 | 調用shutdown方法後處於該狀態,不再接收新任務,處理阻塞隊列中任務。 |
STOP | 001 | 已停止 | 調用shutdownNow方法後處於該狀態,不再新任務,並中斷所有線程,丟棄阻塞隊列中所有任務。 |
TIDYING | 010 | 處理中 | 所有任務已完成,所有工作線程都已回收,等待調用terminated方法。 |
TERMINATED | 011 | 已終止 | 調用terminated方法後處於該狀態,線程池的最終狀態。 |
5.3 execute源碼
看一下往線程池中提交任務的源碼,這是線程池的核心邏輯:
// 往線程池中提交任務
public void execute(Runnable command) {
// 1. 判斷提交的任務是否為null
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 2. 判斷線程數是否小於核心線程數
if (workerCountOf(c) < corePoolSize) {
// 3. 把任務包裝成worker,添加到worker集合中
if (addWorker(command, true))
return;
c = ctl.get();
}
// 4. 判斷如果線程數不小於corePoolSize,並且可以添加到阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
// 5. 重新檢查線程池狀態,如果線程池不是運行狀態,就移除剛纔添加的任務,並執行拒絕策略
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
// 6. 判斷如果線程數是0,就創建非核心線程(任務是null,會從阻塞隊列中拉取任務)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 7. 如果添加阻塞隊列失敗,就創建一個Worker
else if (!addWorker(command, false))
// 8. 如果創建Worker失敗說明已經達到最大線程數了,則執行拒絕策略
reject(command);
}
execute方法的邏輯也很簡單,最終就是調用addWorker方法,把任務添加到worker集合中,再看一下addWorker方法的源碼:
// 添加worker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 檢查是否允許提交任務
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
// 2. 使用死迴圈保證添加線程成功
for (; ; ) {
int wc = workerCountOf(c);
// 3. 校驗線程數是否超過容量限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 4. 使用CAS修改線程數
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 5. 如果線程池狀態變了,則從頭再來
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 6. 把任務和新線程包裝成一個worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 7. 加鎖,控制併發
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 8. 再次校驗線程池狀態是否異常
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 9. 如果線程已經啟動,就拋出異常
if (t.isAlive())
throw new IllegalThreadStateException();
// 10. 添加到worker集合中
workers.add(w);
int s = workers.size();
// 11. 記錄線程數歷史峰值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 12. 啟動線程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
方法雖然很長,但是邏輯很清晰。就是把任務和線程包裝成worker,添加到worker集合,並啟動線程。
5.4 worker源碼
再看一下worker類的結構:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// 工作線程
final Thread thread;
// 任務
Runnable firstTask;
// 創建worker,並創建一個新線程(用來執行任務)
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
5.5 runWorker源碼
再看一下run方法的源碼:
// 線程執行入口
public void run() {
runWorker(this);
}
// 線程運行核心方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 1. 如果當前worker中任務是null,就從阻塞隊列中獲取任務
while (task != null || (task = getTask()) != null) {
// 加鎖,保證thread不被其他線程中斷(除非線程池被中斷)
w.lock();
// 2. 校驗線程池狀態,是否需要中斷當前線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 3. 執行run方法
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 解鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 4. 從worker集合刪除當前worker
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法邏輯也很簡單,就是不斷從阻塞隊列中拉取任務並執行。
再看一下從阻塞隊列中拉取任務的邏輯:
// 從阻塞隊列中拉取任務
private Runnable getTask() {
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 如果線程池已經停了,或者阻塞隊列是空,就回收當前線程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 再次判斷是否需要回收線程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 3. 從阻塞隊列中拉取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
6. 總結
今天帶大家一塊詳細剖析了Java線程池的實現原理,是不是非常簡單?
幾百行的方法雖然看著複雜,令人頭疼,只要由淺入深的梳理清理業務邏輯,源碼讀起來也是小菜一碟。
我是「一燈架構」,如果本文對你有幫助,歡迎各位小伙伴點贊、評論和關註,感謝各位老鐵,我們下期見