好久不發文章了,難道是因為忙,其實是因為懶。這是一篇關於線程池使用和基本原理的科普水文,如果你經常用到線程池,不知道你的用法標準不標準,是否有隱藏的 OOM 風險。不經常用線程池的同學,還有對幾種線程的使用不甚瞭解的同學可以讀一下此文。 為什麼要使用線程池 雖然大家應該都已經很清楚了,但還是說一下。 ...
好久不發文章了,難道是因為忙,其實是因為懶。這是一篇關於線程池使用和基本原理的科普水文,如果你經常用到線程池,不知道你的用法標準不標準,是否有隱藏的 OOM 風險。不經常用線程池的同學,還有對幾種線程的使用不甚瞭解的同學可以讀一下此文。
為什麼要使用線程池
雖然大家應該都已經很清楚了,但還是說一下。其實歸根結底最主要的一個原因就是為了提高性能。
線程池和資料庫連接池是同樣的道理,資料庫連接池是為了減少連接建立和釋放帶來的性能開銷。而線程池則是為了減少線程建立和銷毀帶來的性能消耗。
以 web 項目為例,有以下兩種情況:
1、每次過來一個請求,都要在服務端創建一個新線程來處理請求,請求處理完成銷毀線程;
2、每次過來一個請求,服務端線上程池中直接拿過一個空閑的線程來處理這個請求,處理完成後還給線程池;
答案是肯定的,肯定是第二種使用線程池的方式性能更好。
除了性能這個最重要的原因外,線程池的使用可以幫助我們更合理的使用系統資源。還是以 web 項目為例,如果我們在服務端不使用線程池,而是無節制的來一個請求創建一個線程,系統資源將會很快被耗盡。而使用線程池的話,則可以防止這種情況發生,當然這要建立在正確合理的使用線程池的基礎上,要固定線程的最大數以及等待隊列的大小。
幾種線程池的使用和原理
線程池固然好用,但是要建立在正確的使用方式的基礎上,如果使用方式不當,同樣會出現問題。接下來就介紹一下幾種線程池的使用。
在大名鼎鼎的 J.U.C 包下已經提供了 Executors 類,它已經封裝實現了四種創建線程池的方式,它暴露出幾個簡單的方法供開發者調用。最終都是通過 new ThreadPoolExecutor() ExecutorService 實例,從而得到我們想要的線程池類型。這樣做其實有利有弊,好的是我們不用關心那麼多參數,只需要簡單的指定一兩個參數就可以;不好的是,這樣一來又屏蔽了很多細節,如果有些參數使用預設的,而開發者又不瞭解原理的情況下,可能會造成 OOM 等問題。
很多公司都不建議或者強制不允許直接使用 Executors 類提供的方法來創建線程池,例如阿裡巴巴Java開發手冊里就明確不允許這樣創建線程池,一定要通過 ThreadPoolExecutor(xx,xx,xx...) 來明確線程池的運行規則,指定更合理的參數。
先來看一下 ThreadPoolExecutor 的幾個參數和它們的意義,先來看一下它最完整參數的重載。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
一共有 7 個參數。
corePoolSize
核心線程數,當有任務進來的時候,如果當前線程數還未達到 corePoolSize 個數,則創建核心線程,核心線程有幾個特點:
1、當線程數未達到核心線程最大值的時候,新任務進來,即使有空閑線程,也不會復用,仍然新建核心線程;
2、核心線程一般不會被銷毀,即使是空閑的狀態,但是如果通過方法 allowCoreThreadTimeOut(boolean value) 設置為 true 時,超時也同樣會被銷毀;
3、生產環境首次初始化的時候,可以調用 prestartCoreThread() 方法來預先創建所有核心線程,避免第一次調用緩慢;
maximumPoolSize
除了有核心線程外,有些策略是當核心線程完全無空閑的時候,還會創建一些臨時的線程來處理任務,maximumPoolSize 就是核心線程 + 臨時線程的最大上限。臨時線程有一個超時機制,超過了設置的空閑時間沒有事兒乾,就會被銷毀。
keepAliveTime
這個就是上面兩個參數里所提到的超時時間,也就是線程的最大空閑時間,預設用於非核心線程,通過 allowCoreThreadTimeOut(boolean value) 方法設置後,也會用於核心線程。
unit
這個參數配合上面的 keepAliveTime ,指定超時的時間單位,秒、分、時等。
workQueue
等待執行的任務隊列,如果核心線程沒有空閑的了,新來的任務就會被放到這個等待隊列中。這個參數其實一定程度上決定了線程池的運行策略,為什麼這麼說呢,因為隊列分為有界隊列和無界隊列。
有界隊列:隊列的長度有上限,當核心線程滿載的時候,新任務進來進入隊列,當達到上限,有沒有核心線程去即時取走處理,這個時候,就會創建臨時線程。(警惕臨時線程無限增加的風險)
無界隊列:隊列沒有上限的,當沒有核心線程空閑的時候,新來的任務可以無止境的向隊列中添加,而永遠也不會創建臨時線程。(警惕任務隊列無限堆積的風險)
threadFactory
它是一個介面,用於實現生成線程的方式、定義線程名格式、是否後臺執行等等,可以用 Executors.defaultThreadFactory() 預設的實現即可,也可以用 Guava 等三方庫提供的方法實現,如果有特殊要求的話可以自己定義。它最重要的地方應該就是定義線程名稱的格式,便於排查問題了吧。
handler
當沒有空閑的線程處理任務,並且等待隊列已滿(當然這隻對有界隊列有效),再有新任務進來的話,就要做一些取捨了,而這個參數就是指定取捨策略的,有下麵四種策略可以選擇:
ThreadPoolExecutor.AbortPolicy:直接拋出異常,這是預設策略;
ThreadPoolExecutor.DiscardPolicy:直接丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後將新來的任務加入等待隊列
ThreadPoolExecutor.CallerRunsPolicy:由線程池所在的線程處理該任務,比如在 main 函數中創建線程池,如果執行此策略,將有 main 線程來執行該任務
雖然並不提倡用 Executors 中的方法來創建線程池,但還是用他們來講一下幾種線程池的原理。
1、newFixedThreadPool
它有兩個重載方法,代碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
建立一個線程數量固定的線程池,規定的最大線程數量,超過這個數量之後進來的任務,會放到等待隊列中,如果有空閑線程,則在等待隊列中獲取,遵循先進先出原則。
創建固定線程數量線程池, corePoolSize 和 maximumPoolSize 要一致,即核心線程數和最大線程數(核心+非核心線程)一致,Executors 預設使用的是 LinkedBlockingQueue 作為等待隊列,這是一個無界隊列,這也是使用它的風險所在,除非你能保證提交的任務不會無節制的增長,否則不要使用無界隊列,這樣有可能造成等待隊列無限增加,造成 OOM。
正確的創建固定線程數線程池的做法是
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();
public static ExecutorService createFixedThreadPool() {
int poolSize = 5;
int queueSize = 10;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize), threadFactory, new ThreadPoolExecutor.AbortPolicy());
return executorService;
}
上面代碼是創建一個 5 個線程的固定數量線程池,這裡線程存活時間沒有作用,所以設置為 0,使用了 ArrayBlockingQueue 作為等待隊列,設置長度為 10 ,最多允許10個等待任務,超過的任務會執行預設的 AbortPolicy 策略,也就是直接拋異常。ThreadFactory 使用了 Guava 庫提供的方法,定義了線程名稱,方便之後排查問題。
2、newSingleThreadExecutor
建立一個只有一個線程的線程池,如果有超過一個任務進來,只有一個可以執行,其餘的都會放到等待隊列中,如果有空閑線程,則在等待隊列中獲取,遵循先進先出原則。使用 LinkedBlockingQueue 作為等待隊列。
這個方法同樣存在等待隊列無限長的問題,容易造成 OOM,所以正確的創建方式參考上面固定數量線程池創建的方式,只是把 poolSize 設置為 1 。
3、newCachedThreadPool
緩存型線程池,在核心線程達到最大值之前,有任務進來就會創建新的核心線程,並加入核心線程池,即時有空閑的線程,也不會復用。達到最大核心線程數後,新任務進來,如果有空閑線程,則直接拿來使用,如果沒有空閑線程,則新建臨時線程。並且線程的允許空閑時間都很短,如果超過空閑時間沒有活動,則銷毀臨時線程。關鍵點就在於它使用 SynchronousQueue 作為等待隊列,它不會保留任務,新任務進來後,直接創建臨時線程處理,這樣一來,也就容易造成無限制的創建線程,造成 OOM。
正確的創建緩存型線程池的做法是
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();
public static ExecutorService createCacheThreadPool(){
int coreSize = 10;
int maxSize = 20;
return new ThreadPoolExecutor(coreSize, maxSize, 10L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
4、newScheduledThreadPool
計劃型線程池,可以設置固定時間的延時或者定期執行任務,同樣是看線程池中有沒有空閑線程,如果有,直接拿來使用,如果沒有,則新建線程加入池。使用的是 DelayedWorkQueue 作為等待隊列,這中類型的隊列會保證只有到了指定的延時時間,才會執行任務。
正確的創建緩存型線程池的做法是
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();
private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
Task task = new Task();
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, threadFactory);
executorService.scheduleAtFixedRate(task,0L,5L, TimeUnit.SECONDS);
latch.await();
}
static class Task implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "executing");
}
}