一、序言 Java多線程編程線程池被廣泛使用,甚至成為了標配。 線程池本質是池化技術的應用,和連接池類似,創建連接與關閉連接屬於耗時操作,創建線程與銷毀線程也屬於重操作,為了提高效率,先提前創建好一批線程,當有需要使用線程時從線程池取出,用完後放回線程池,這樣避免了頻繁創建與銷毀線程。 // 任務 ...
一、序言
Java多線程編程線程池被廣泛使用,甚至成為了標配。
線程池本質是池化技術
的應用,和連接池類似,創建連接與關閉連接屬於耗時操作,創建線程與銷毀線程也屬於重操作,為了提高效率,先提前創建好一批線程,當有需要使用線程時從線程池取出,用完後放回線程池,這樣避免了頻繁創建與銷毀線程。
// 任務
Runnable runnable = () -> System.out.println(Thread.currentThread().getId());
在應用中優先選用線程池執行非同步任務,根據不同的場景選用不同的線程池,提高非同步任務執行效率。
1、普通執行
new Thread(runnable).start();
2、線程池執行
Executors.newSingleThreadExecutor().execute(runnable)
二、線程池基礎
(一)核心參數
1、核心參數
線程池的核心參數決定了池的類型,進而決定了池的特性。
參數 | 解釋 | 行為 |
---|---|---|
corePoolSize | 核心線程數 | 池中長期維護的線程數量,不主動回收 |
maximumPoolSize | 最大線程數 | 最大線程數大於等於核心線程數 |
keepAliveTime | 線程最大空閑時間 | 非核心線程最大空閑時間,超時回收線程 |
workQueue | 工作隊列 | 工作隊列直接決定線程池的類型 |
2、參數與池的關係
Executors類預設創建線程池與參數對應關係。
線程池 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue |
---|---|---|---|---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60 | SynchronousQueue |
newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
newFixedThreadPool | N | N | 0 | LinkedBlockingQueue |
newScheduledThreadPool | N | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
(二)線程池對比
根據使用場景選擇對應的線程池。
1、通用對比
線程池 | 特點 | 適用場景 |
---|---|---|
newCachedThreadPool | 超時未使用的線程回自動銷毀,有新任務時自動創建 | 適用於低頻、輕量級的任務。回收線程的目的是節約線程長時間空閑而占有的資源。 |
newSingleThreadExecutor | 線程池中有且只有一個線程 | 順序執行任務 |
newFixedThreadPool | 線程池中有固定數量的線程,且一直存在 | 適用於高頻的任務,即線程在大多數時間里都處於工作狀態。 |
newScheduledThreadPool | 定時線程池 | 與定時調度相關聯 |
2、拓展對比
維護僅有一個線程的線程池有如下兩種方式,正常使用的情況下,二者差異不大;複雜使用環境下,二者存在細微的差異。用newSingleThreadExecutor方式創建的線程池在任何時刻至多只有一個線程,因此可以理解為用非同步的方式執行順序任務;後者初始化的時候也只有一個線程,使用過程中可能會出現最大線程數超過1的情況,這時要求線性執行的任務會並行執行,業務邏輯可能會出現問題,與實際場景有關。
private final static ExecutorService executor = Executors.newSingleThreadExecutor();
private final static ExecutorService executor = Executors.newFixedThreadPool(1);
(三)線程池原理
線程池主要處理流程,任務提交之後是怎麼執行的。大致如下:
- 判斷核心線程池是否已滿,如果不是,則創建線程執行任務
- 如果核心線程池滿了,判斷隊列是否滿了,如果隊列沒滿,將任務放在隊列中
- 如果隊列滿了,則判斷線程池是否已滿,如果沒滿,創建線程執行任務
- 如果線程池也滿了,則按照拒絕策略對任務進行處理
(四)提交任務的方式
往線程池中提交任務,主要有兩種方法:提交無返回值的任務和提交有返回值的任務。
1、無返回值任務
execute
用於提交不需要返回結果的任務。
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> System.out.println("hello"));
}
2、有返回值任務
submit()
用於提交一個需要返回果的任務。
該方法返回一個Future
對象,通過調用這個對象的get()
方法,我們就能獲得返回結果。get()
方法會一直阻塞,直到返回結果返回。
我們也可以使用它的重載方法get(long timeout, TimeUnit unit)
,這個方法也會阻塞,但是在超時時間內仍然沒有返回結果時,將拋出異常TimeoutException
。
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Long> future = executor.submit(() -> {
System.out.println("task is executed");
return System.currentTimeMillis();
});
System.out.println("task execute time is: " + future.get());
}
在提交任務時,如果無返回值任務,優先使用
execute
。
(無)關閉線程池
線上程池使用完成之後,我們需要對線程池中的資源進行釋放操作,這就涉及到關閉功能。我們可以調用線程池對象的shutdown()
和shutdownNow()
方法來關閉線程池。
這兩個方法都是關閉操作,又有什麼不同呢?
shutdown()
會將線程池狀態置為SHUTDOWN
,不再接受新的任務,同時會等待線程池中已有的任務執行完成再結束。shutdownNow()
會將線程池狀態置為SHUTDOWN
,對所有線程執行interrupt()
操作,清空隊列,並將隊列中的任務返回回來。
另外,關閉線程池涉及到兩個返回boolean的方法,isShutdown()
和isTerminated
,分別表示是否關閉和是否終止。
三、Executors
Executors
是一個線程池工廠,提供了很多的工廠方法,我們來看看它大概能創建哪些線程池。
// 創建單一線程的線程池
public static ExecutorService newSingleThreadExecutor();
// 創建固定數量的線程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 創建帶緩存的線程池
public static ExecutorService newCachedThreadPool();
// 創建定時調度的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 創建流式(fork-join)線程池
public static ExecutorService newWorkStealingPool();
1、創建單一線程的線程池
任何時候線程池中至多只有一個線程,當線程執行異常終止時會自動創建一個新線程替換。如果既有非同步執行任務的需求又希望任務得以順序執行,那麼此類型線程池是首選。
若多個任務被提交到此線程池,那麼會被緩存到隊列。當線程空閑的時候,按照FIFO的方式進行處理。
2、創建固定數量的線程池
創建核心線程與最大線程數相等的固定線程數的線程池,任何時刻至多有固定數目的線程,當線程因異常而終止時則會自動創建線程替換。
當有新任務加入時,如果池內線程均處於活躍狀態,則任務進入等待隊列中,直到有空閑線程,隊列中的任務才會被順序執行;如果池內有非活躍線程,則任務可以立刻得以執行。
- 如果線程的數量未達到指定數量,則創建線程來執行任務
- 如果線程池的數量達到了指定數量,並且有線程是空閑的,則取出空閑線程執行任務
- 如果沒有線程是空閑的,則將任務緩存到隊列(隊列長度為
Integer.MAX_VALUE
)。當線程空閑的時候,按照FIFO的方式進行處理
3、創建可伸縮的線程池
這種方式創建的線程池,核心線程池的長度為0,線程池最大長度為Integer.MAX_VALUE
。由於本身使用SynchronousQueue
作為等待隊列的緣故,導致往隊列裡面每插入一個元素,必須等待另一個線程從這個隊列刪除一個元素。
- 線程池可維護0到Integer.MAX_VALUE個線程資源,空閑線程預設情況下超過60秒未使用則會被銷毀,長期閑置的池占用較少的資源。
- 當有新任務加入時,如果池中有空閑且尚未銷毀的線程,則將任務交給此線程執行;如果沒有可用的線程,則創建一個新線程執行任務並添加到池中。
4、創建定時調度的線程池
和上面3個工廠方法返回的線程池類型有所不同,它返回的是ScheduledThreadPoolExecutor
類型的線程池。平時我們實現定時調度功能的時候,可能更多的是使用第三方類庫,比如:quartz等。但是對於更底層的功能,我們仍然需要瞭解。
四、手動創建線程池
理論上,我們可以通過Executors
來創建線程池,這種方式非常簡單。但正是因為簡單,所以限制了線程池的功能。比如:無長度限制的隊列,可能因為任務堆積導致OOM,這是非常嚴重的bug,應儘可能地避免。怎麼避免?歸根結底,還是需要我們通過更底層的方式來創建線程池。
拋開定時調度的線程池不管,我們看看ThreadPoolExecutor
。它提供了好幾個構造方法,但是最底層的構造方法卻只有一個。那麼,我們就從這個構造方法著手分析。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
這個構造方法有7個參數,我們逐一來進行分析。
corePoolSize
,線程池中的核心線程數maximumPoolSize
,線程池中的最大線程數keepAliveTime
,空閑時間,當線程池數量超過核心線程數時,多餘的空閑線程存活的時間,即:這些線程多久被銷毀。unit
,空閑時間的單位,可以是毫秒、秒、分鐘、小時和天,等等workQueue
,等待隊列,線程池中的線程數超過核心線程數時,任務將放在等待隊列,它是一個BlockingQueue
類型的對象threadFactory
,線程工廠,我們可以使用它來創建一個線程handler
,拒絕策略,當線程池和等待隊列都滿了之後,需要通過該對象的回調函數進行回調處理
這些參數裡面,基本類型的參數都比較簡單,我們不做進一步的分析。我們更關心的是workQueue
、threadFactory
和handler
,接下來我們將進一步分析。
(一)等待隊列-workQueue
等待隊列是BlockingQueue
類型的,理論上只要是它的子類,我們都可以用來作為等待隊列。
同時,jdk內部自帶一些阻塞隊列,我們來看看大概有哪些。
ArrayBlockingQueue
,隊列是有界的,基於數組實現的阻塞隊列LinkedBlockingQueue
,隊列可以有界,也可以無界。基於鏈表實現的阻塞隊列SynchronousQueue
,不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作將一直處於阻塞狀態。該隊列也是Executors.newCachedThreadPool()
的預設隊列PriorityBlockingQueue
,帶優先順序的無界阻塞隊列
通常情況下,我們需要指定阻塞隊列的上界(比如1024)。另外,如果執行的任務很多,我們可能需要將任務進行分類,然後將不同分類的任務放到不同的線程池中執行。
(二)線程工廠-threadFactory
ThreadFactory
是一個介面,只有一個方法。既然是線程工廠,那麼我們就可以用它生產一個線程對象。來看看這個介面的定義。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
Executors
的實現使用了預設的線程工廠-DefaultThreadFactory
。它的實現主要用於創建一個線程,線程的名字為pool-{poolNum}-thread-{threadNum}
。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
很多時候,我們需要自定義線程名字。我們只需要自己實現ThreadFactory
,用於創建特定場景的線程即可。
(三)拒絕策略-handler
所謂拒絕策略,就是當線程池滿了、隊列也滿了的時候,我們對任務採取的措施。或者丟棄、或者執行、或者其他...
jdk自帶4種拒絕策略,我們來看看。
CallerRunsPolicy
// 在調用者線程執行AbortPolicy
// 直接拋出RejectedExecutionException
異常DiscardPolicy
// 任務直接丟棄,不做任何處理DiscardOldestPolicy
// 丟棄隊列里最舊的那個任務,再嘗試執行當前任務
這四種策略各有優劣,比較常用的是DiscardPolicy
,但是這種策略有一個弊端就是任務執行的軌跡不會被記錄下來。所以,我們往往需要實現自定義的拒絕策略, 通過實現RejectedExecutionHandler
介面的方式。
五、其它
配置線程池的參數
前面我們講到了手動創建線程池涉及到的幾個參數,那麼我們要如何設置這些參數才算是正確的應用呢?實際上,需要根據任務的特性來分析。
- 任務的性質:CPU密集型、IO密集型和混雜型
- 任務的優先順序:高中低
- 任務執行的時間:長中短
- 任務的依賴性:是否依賴資料庫或者其他系統資源
不同的性質的任務,我們採取的配置將有所不同。在《Java併發編程實踐》中有相應的計算公式。
通常來說,如果任務屬於CPU密集型,那麼我們可以將線程池數量設置成CPU的個數,以減少線程切換帶來的開銷。如果任務屬於IO密集型,我們可以將線程池數量設置得更多一些,比如CPU個數*2。
PS:我們可以通過
Runtime.getRuntime().availableProcessors()
來獲取CPU的個數。
線程池監控
如果系統中大量用到了線程池,那麼我們有必要對線程池進行監控。利用監控,我們能在問題出現前提前感知到,也可以根據監控信息來定位可能出現的問題。
那麼我們可以監控哪些信息?又有哪些方法可用於我們的擴展支持呢?
首先,ThreadPoolExecutor
自帶了一些方法。
long getTaskCount()
,獲取已經執行或正在執行的任務數long getCompletedTaskCount()
,獲取已經執行的任務數int getLargestPoolSize()
,獲取線程池曾經創建過的最大線程數,根據這個參數,我們可以知道線程池是否滿過int getPoolSize()
,獲取線程池線程數int getActiveCount()
,獲取活躍線程數(正在執行任務的線程數)
其次,ThreadPoolExecutor
留給我們自行處理的方法有3個,它在ThreadPoolExecutor
中為空實現(也就是什麼都不做)。
protected void beforeExecute(Thread t, Runnable r)
// 任務執行前被調用protected void afterExecute(Runnable r, Throwable t)
// 任務執行後被調用protected void terminated()
// 線程池結束後被調用
六、總結
- 儘量使用手動的方式創建線程池,避免使用
Executors
工廠類 - 根據場景,合理設置線程池的各個參數,包括線程池數量、隊列、線程工廠和拒絕策略
喜歡本文就【♥️推薦♥️】一下,激勵我持續創作。這個Github同樣精彩,收到您的star我會很激動。本文歸檔在專題博客,視頻講解在B站。