面試官: 小伙子,我看你簡歷上寫的項目中用到了線程池,你知道線程池是怎樣實現復用線程的? 這面試官是不是想坑我?是不是擺明瞭不讓我通過? 難道你不應該問線程池有哪些核心參數?每個參數具體作用是什麼? ...
背景介紹:
你剛從學校畢業後,到新公司實習,試用期又被畢業,然後你又不得不出來面試,好在面試的時候碰到個美女面試官!
面試官: 小伙子,我看你簡歷上寫的項目中用到了線程池,你知道線程池是怎樣實現復用線程的?
這面試官是不是想坑我?是不是擺明瞭不讓我通過?
難道你不應該問線程池有哪些核心參數?每個參數具體作用是什麼?
往線程池中不斷提交任務,線程池的處理流程是什麼?
這些才是你應該問的,這些八股文我已經背熟了,你不問,瞎問什麼復用線程?
幸虧我看了一燈的八股文,聽我給你背一遍!
我: 線程池復用線程的邏輯很簡單,就是線上程啟動後,通過while死迴圈,不斷從阻塞隊列中拉取任務,從而達到了復用線程的目的。
具體源碼如下:
// 線程執行入口
public void run() {
runWorker(this);
}
// 線程運行核心方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 1. 使用while死迴圈,不斷從阻塞隊列中拉取任務
while (task != null || (task = getTask()) != null) {
// 加鎖,保證thread不被其他線程中斷(除非線程池被中斷)
w.lock();
// 2. 校驗線程池狀態,是否需要中斷當前線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 3. 執行run方法
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法邏輯很簡單,就是不斷從阻塞隊列中拉取任務並執行。
面試官: 小伙子,有點東西。我們都知道線程池會回收超過空閑時間的線程,那麼線程池是怎麼統計線程的空閑時間的?
美女面試官的問題真刁鑽,讓人頭疼啊!這問的也太深了吧!
沒看過源碼的話,真不好回答。
我: 嗯...,可能是有個監控線程在後臺不停的統計每個線程的空閑時間,看到線程的空閑時間超過閾值的時候,就回收掉。
面試官: 小伙子,你的想法挺不錯,邏輯很嚴謹,你確定線程池內部是這麼實現的嗎?
問得我有點不自信了,沒看過源碼不能瞎蒙。
我還是去瞅一眼一燈寫的八股文吧。
我: 這個我知道,線程池統計線程的空閑時間的實現邏輯很簡單。
阻塞隊列(BlockingQueue)提供了一個poll(time, unit)
方法用來拉取數據,
作用就是: 當隊列為空時,會阻塞指定時間,然後返回null。
線程池就是就是利用阻塞隊列的這個方法,如果在指定時間內拉取不到任務,就表示該線程的存活時間已經超過閾值了,就要被回收了。
具體源碼如下:
// 從阻塞隊列中拉取任務
private Runnable getTask() {
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 如果線程池已經停了,或者阻塞隊列是空,就回收當前線程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 再次判斷是否需要回收線程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 3. 在指定時間內,從阻塞隊列中拉取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 4. 如果沒有拉取到任務,就標識該線程已超時,然後就被回收
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
面試官: 小伙子,可以啊,你是懂線程池源碼的。再問你個問題,如果線程池拋異常了,也沒有try/catch,會發生什麼?
美女面試官你這是準備打破砂鍋問到底,鐵了心不讓我過,是吧?
我的代碼風格是很嚴謹的,誰寫的業務代碼不try/catch,也沒遇到過這種情況。
讓我再看一下一燈總結的八股文吧。
我: 有了,線程池中的代碼如果拋異常了,也沒有try/catch,會從線程池中刪除這個異常線程,並創建一個新線程。
不信的話,我們可以測試驗證一下:
/**
* @author 一燈架構
* @apiNote 線程池示例
**/
public class ThreadPoolDemo {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
// 1. 創建一個單個線程的線程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 2. 往線程池中提交3個任務
for (int i = 0; i < 3; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 關註公眾號:一燈架構");
throw new RuntimeException("拋異常了!");
});
}
// 3. 關閉線程池
executorService.shutdown();
}
}
輸出結果:
pool-1-thread-1 關註公眾號:一燈架構
pool-1-thread-2 關註公眾號:一燈架構
pool-1-thread-3 關註公眾號:一燈架構
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 拋異常了!
at com.yideng.SynchronousQueueDemo.lambda$main$0(ThreadPoolDemo.java:21)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-2" java.lang.RuntimeException: 拋異常了!
at com.yideng.SynchronousQueueDemo.lambda$main$0(ThreadPoolDemo.java:21)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-3" java.lang.RuntimeException: 拋異常了!
at com.yideng.SynchronousQueueDemo.lambda$main$0(ThreadPoolDemo.java:21)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
從輸出結果中可以看出,線程名稱並不是同一個,而是累加的,說明原線程已經被回收,新建了個線程。
我們再看一下源碼,驗證一下:
// 線程拋異常後,退出邏輯
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 1. 從工作線程中刪除當前線程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 2. 中斷當前線程
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 3. 新建一個線程
addWorker(null, false);
}
}
如果想統一處理異常,可以自定義線程創建工廠,在工廠裡面設置異常處理邏輯。
/**
* @author 一燈架構
* @apiNote 線程池示例
**/
public class ThreadPoolDemo {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
// 1. 創建一個單個線程的線程池
ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> {
// 2. 自定義線程創建工廠,並設置異常處理邏輯
Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler((t, e) -> {
System.out.println("捕獲到異常:" + e.getMessage());
});
return thread;
});
// 3. 往線程池中提交3個任務
for (int i = 0; i < 3; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 關註公眾號:一燈架構");
throw new RuntimeException("拋異常了!");
});
}
// 4. 關閉線程池
executorService.shutdown();
}
}
輸出結果:
Thread-0 關註公眾號:一燈架構
捕獲到異常:拋異常了!
Thread-1 關註公眾號:一燈架構
捕獲到異常:拋異常了!
Thread-2 關註公眾號:一燈架構
捕獲到異常:拋異常了!
面試官: 小伙子,論源碼,還是得看你,還是你背的熟。現在我就給你發offer,薪資直接漲10%,明天9點就來上班吧,咱們公司實行996工作制。
我是「一燈架構」,如果本文對你有幫助,歡迎各位小伙伴點贊、評論和關註,感謝各位老鐵,我們下期見