1 為何要適用線程池 首先我們知道線程對於操作系統來說是一種 珍貴的資源 ,像我們如果每次使用到的時候手動創建,線程執行完 方法後又自動關閉,下次用的時候還得手動創建,這樣無論對於操作系統還是我們來說都是一種 時間 和 資源 的浪費,所以我們可以選擇維護一些線程,這些線程在執行完任務之後繼續執行其他 ...
1 為何要適用線程池
首先我們知道線程對於操作系統來說是一種珍貴的資源,像我們如果每次使用到的時候手動創建,線程執行完run()
方法後又自動關閉,下次用的時候還得手動創建,這樣無論對於操作系統還是我們來說都是一種時間和資源的浪費,所以我們可以選擇維護一些線程,這些線程在執行完任務之後繼續執行其他收到的任務,從而實現資源的復用,這些線程就構成了平時說的線程池。其能帶來許多的好處,比如:
實現線程資源復用。減少手動關閉線程的資源浪費。
一定幅度提升響應速度。線上程池承受範圍內(指還能接收任務的狀態)線程可以直接使用,而不用進行手動創建。
方便線程的管理。把線程集中在一起可以讓我們統一的設置其狀態或者超時的時間等,從而達到我們預期的狀態,此外還能減少OOM的發生,比如說如果我們因為一些失誤操作而導致在某個地方不斷的創建線程,那麼會導致系統奔潰,但是如果使用線程池我們可以設定同時執行任務的線程上限和能接收的最大任務數,可以很好的避免這種情況。(當然這是建立在你的最大線程數和任務數處於合理的範圍內)
我們知道線上程的生命周期中(關於線程生命周期可以看看我的另一篇文章——Java線程狀態和關閉線程的正確姿勢),線程正常執行完run()
方法就結束進入終止狀態了,那麼線程池是如何實現一個線程在執行完一個任務之後不進入死亡而繼續執行其他任務的呢?
2 線程池是如何實現線程復用的
其實大家大概都能猜到其內部肯定是使用一個while
迴圈來不斷獲取任務執行,那我們來看看其內部大概是如何實現的,首先看下execute()
方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 這些花里胡哨的可以不用管,大概邏輯就下方3.1節的線程池工作流程,關鍵的是找到線程的啟動方法start()
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 其線程的啟動方法就放在這裡的addWorker方法中
if (addWorker(command, true))
return;
c = ctl.get();
}
// some code....
}
點進去我們可以看到其實現邏輯,這裡刪減一些邏輯以便更加清晰的看清楚:
private boolean addWorker(Runnable firstTask, boolean core) {
// some code...
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// code...
if (workerAdded) {
/*
* 可以看到線程是從這裡開始啟動的,我們找到t的根源,發現是Worker中的thread對象
* 而我們傳進來的執行也被傳入worker中
*/
t.start();
workerStarted = true;
}
}
} finally {
// ...
}
return workerStarted;
}
再跟進worker
的構造方法中,看到thread
是使用線程工廠創建的,而創建方法則是把自身傳了進去(內部類Worker
實現了Runnable
方法)
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
也就是說thread
中runnable
對象就是Worker
本身,調用thread.start()
方法會調用Worker
的run()
方法,那麼在上方使用t.start()
啟動線程後就會調用Worker
的run()
方法,讓我們來看下Worker
的run()
方法做了什麼。
// 調用runWorker()
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
try {
// while迴圈不斷的從隊列中獲取任務執行,直到滿足條件退出迴圈
while (task != null || (task = getTask()) != null) {
w.lock();
try {
// 預設空實現,可以重寫此方法以便線上程執行前執行一些操作
beforeExecute(wt, task);
try {
// 直接調用task的run方法,而task就是我們傳進來的runnable
task.run();
} catch (Exception x) {
thrown = x; throw x;
} finally {
// 同上,鉤子方法
afterExecute(task, thrown);
}
}
}
// other code
}
okay,到這裡我們就知道了線程池是如何實現線程執行完任務復用的,跟我們一開始想的差不多就是使用一個while
迴圈不斷從隊列中獲取任務,顯式的調用任務的run()
方法,直到沒有隊列為空(或者其他錯誤因素而退出迴圈)。
3 線程池是如何工作的
3.1 線程池的工作流程
首先先上一張線程池的工作圖,根據工作圖來理解線程池的工作流程,記住這張工作圖對理解線程池有很好的幫助。
- 在執行
Executor.execute(runnable)
或者submit(runnable/callable)
的時候,檢查此時線程池中的線程數量是否達到核心線程數,如果還沒有,則創建'核心線程'執行任務。(可以理解為線上程池中分為'核心線程'和'最大線程'兩種種類的線程,'最大線程'在空閑一段時間之後就自己關閉,'核心線程'則會一直嘗試獲取工作)- 如果達到核心線程數,那麼檢查隊列是否已滿,如果沒滿,則將任務放入隊列中等待消費。(線上程池中任務和線程不會直接交互,一般都會維護一個阻塞隊列,任務來的時候嘗試放入隊列中,而線程則是統一從隊列中拿取任務執行)
- 如果隊列已滿,那麼檢查線程數量是否達到最大線程數,如果沒有的話則創建'最大線程'執行任務,否則的話則執行拒絕策略。
3.2 如何創建一個線程池
我們先通過較底層的一個類ThreadPoolExecutor
來創建。
public class Test {
/* -----為了便於理解,可以把線程池中的類型分成兩類——[核心線程]和[最大線程]------ */
/* -----可以直接看main方法的例子,再上來看這裡參數的註釋方便理解------ */
/**
* 核心線程數,線程池的核心線程到達這個數值之後接收任務便不再創建線程,
* 而是放入隊列等待消費,直到隊列填滿
*/
private static final int CORE_POOL_SIZE = 1;
/**
* 最大線程數,當隊列被填滿時再接收新的任務的時候就會創建'最大線程'來緩解壓力,
* '最大線程'在空閑一段時間後會消亡,具體的空閑時間取決於下方的KEEP_ALIVE_TIME,
* '最大線程'達到這個數值後便不再創建,舉個例子,核心線程數為1,最大線程數為2,
* 那麼核心線程的數量最多為1,'最大線程'的數量最多為1(最大線程數-核心線程數)
*/
private static final int MAXIMUM_POOL_SIZE = 2;
/** 最大線程的空閑時間,'最大線程'空閑時間達到這個數值時消亡,時間單位為下個參數TimeUnit*/
private static final int KEEP_ALIVE_TIME = 60;
/** 空閑時間的計量單位*/
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
/**
* 任務隊列,為阻塞隊列,阻塞隊列的特點是
* 1.調用take()方法時,若隊列為空則進入阻塞狀態而不是返回空
* 2.調用put()方法時,若隊列已滿則進入阻塞狀態
* 阻塞隊列可以分為數組隊列和鏈表隊列(區別大概就是List和Linked的區別),可以通過設定
* 邊界值的方式來決定隊列中最多可以容納多少任務,如果超出則創建最大線程或者採取拒絕策略
* 如果設定了邊界值則為有界隊列,否則則為無界隊列(無界隊列容易引起OOM,隊列的大小應根據需求制定)
*/
private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new ArrayBlockingQueue<>(1);
/** 線程工廠,由該工廠產生執行任務的線程*/
private static final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory();
/**
* 拒絕策略,當已達到最大線程並且隊列已滿的時候對新來任務的處理措施,分為四種,由ThreadPoolExecutor內部類實現
* 1、ThreadPoolExecutor.CallerRunsPolicy 當前線程來執行其任務,也就是說調用executor.execute()的線程執行,
而不是線程池額外提供線程執行
* 2、ThreadPoolExecutor.AbortPolicy 直接拋出RejectedExecutionException異常。
* 3、ThreadPoolExecutor.DiscardPolicy 直接丟棄任務,不會對新來的任務進行任何處理,也不會得到任何反饋。
* 4、ThreadPoolExecutor.DiscardOldestPolicy 丟棄隊列中最老的任務(指的是隊列的第一個任務,調用poll()方法將其丟棄),
然後重新調用executor.execute()方法
*/
private static final RejectedExecutionHandler REJECTED_EXECUTION_HANDLER = new ThreadPoolExecutor.AbortPolicy();
public static void main(String[] args) throws InterruptedException {
// 創建一個主要線程數為1,最大線程數為2,隊列大小為1的線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TIME_UNIT,
BLOCKING_QUEUE,
THREAD_FACTORY,
REJECTED_EXECUTION_HANDLER);
// 此時線程池中沒有任何線程,直接創建一個主要線程來執行
executor.execute(() -> {
try {
System.err.println("execute thread1:" + Thread.currentThread().getName());
// 睡眠1秒,驗證下方的線程放入了隊列中而不是再次創建線程
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 此時主要線程數已經達到最大,新來的任務放入隊列中
executor.execute(() -> {
try {
System.err.println("execute thread2:" + Thread.currentThread().getName());
// 睡眠1秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 再次接收任務時,由於隊列滿了嘗試創建最大線程數來執行
executor.execute(() -> {
try {
System.err.println("execute thread3:" + Thread.currentThread().getName());
// 睡眠1秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 線程池已經處於飽和狀態,再次來任務時採取拒絕策略,這裡採取的是直接報錯
executor.execute(() -> {
try {
System.err.println("execute thread4:" + Thread.currentThread().getName());
// 睡眠1秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
結果圖:
從執行順序可以看出是1->3->2沒有4,也就很好的再現了3.1節的線程池工作的流程,線程池創建核心線程執行任務,核心線程數量達到上限(這裡為1)後將新來的任務放入隊列(這裡大小為1),隊列滿了,嘗試創建最大線程執行任新的任務(這個例子也表明瞭最大線程執行的是新來的任務,而不是在隊列頭的任務),此時線程池已經飽和了,如果再來新任務根據拒絕策略響應處理,這裡選擇報錯,所以不會執行任務4。
這裡由於篇幅原因就不舉例其他拒絕策略了,想要驗證可以自己在本機上測試一下。
3.3 簡單聊聊Executors常見的線程池
在JUC包中提供了一個Executors
框架,裡面提供了快速創建五種線程池的方式——newFixedThreadPool()
、newSingleThreadExecutor()
、newCachedThreadPool()
、newScheduledThreadPool()
和newWorkStealingPool()
(Java1.8之後新增,這裡暫不介紹,後續補充)。
newFixedThreadPool(int n)
:創建一個核心線程數和最大線程數都為n的線程池,也就是說,線程池中只有核心線程,不會有最大線程(最大線程的容量=最大線程數-核心線程數),其使用的隊列為LinkedBlockingQueue
無界隊列,意味著不會拋棄任何任務,也就有可能發生OOM,這是其一大缺點。 這些瞭解一下就行,並不需要記甚至可以忘記,前四種快速創建線程池的實質都是使用
ThreadPoolExecutor
實現,只不過其參數不同罷了,其實現如下,可以看到這些參數決定了它的性質。public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newSingleThreadExecutor()
:實現方面沒什麼好說的,就是上述線程池的n為1,但是唯一不同的是其實現外面又包了一層。public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
這一層的作用就是重寫
finalize()
方法,方便JVM
回收的時候保證關閉線程池,關於finalize
和JVM
回收的可以看下我之前的另一篇 JVM垃圾回收的文章。// 具體實現 static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } // 重寫了finalize()方法,保證線程池回收的時候先執行完剩下的任務 protected void finalize() { super.shutdown(); } }
newCachedThreadPool()
:緩存隊列,沒有核心線程,最大線程數為Integer.MAX_VALUE
,並且採用比較特殊的SynchronousQueue
隊列,這種隊列實質不會占據存儲空間,在任務提交到線程池中,如果沒有任務接收任務,那麼此時就會進入阻塞狀態,同理線程從隊列拿任務也是一樣,不過這裡的最大線程數為Integer.MAX_VALUE
,所以每次來新任務,如果沒有空閑線程就會一直創建線程,也有可能導致OOM。public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, // 同步隊列 new SynchronousQueue<Runnable>()); }
newScheduledThreadPool(int n)
:定時線程池,核心線程數為n,最大線程數為Integer.MAX_VALUE
,採用延時隊列DelayedWorkQueue
實現,非常適合定時任務的實現。public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
在《阿裡巴巴開放規範》中也說道不允許使用Executors
創建線程池,原因上面也大致說了,沒有好好的把控資源可能導致OOM的情況。
【強制】線程池不允許使用
Executors
去創建,而是通過ThreadPoolExecutor
的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
4 總結
在日常的需求中我們難免會遇到需要多個任務同時進行的情況,這時就避免不了使用線程,如果還是使用傳統的方法手動創建線程的話,那麼於我們應用於系統而言都是一種資源浪費,所以我們可以考慮維護一些固定的線程重覆利用,這些線程就構成了一個線程池,有效的管理這些線程可以減少損耗並且一定程度提升響應速度。
我們知道線程在調用start()
方法執行完之後就會消亡,那麼線程池是如何實現一直執行任務的呢?我們在execute()
方法中發現了其實現的關鍵是內部類Worker
。Worker
是一個內部類,實現了Runnable
介面,線上程工廠創建線程的時候會將Worker
自身傳進去,那麼線程調用start()
方法的時候必定會調用Worker
的run()
方法,而在Worker
的run()
方法中,則是採用一個while
迴圈不斷的從隊列中獲取任務,然後顯示的執行任務的run()
方法,從而實現一個線程執行多個任務。
接著我們使用圖解直觀的看到線程池的工作流程,並使用一小段代碼來解釋ThreadPoolExecutor
的各項參數的意義所在,並簡單的模擬了整個工作流程。
最後講了幾種快速創建線程池的方法,其本質都是調用ThreadPoolExecutor
的構造方法,只是參數的不同決定了他們的不同性質,所以ThreadPoolExecutor
才是根本,其他的看看就行。同時也講明Executors
創建線程池有出現OOM的隱患,所以建議使用ThreadPoolExecutor
來創建。
若文章有誤,希望大家幫忙指出。
即使昧著良心我也要說,"Java是世界上最好的語言。"