概述 什麼是線程池? 線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然後在創建線程後自動啟動這些任務。 為什麼要用線程池? 降低資源消耗 通過重覆利用已創建的線程降低線程創建和銷毀造成的消耗。 提高響應速度 當任務到達時,任務可以不需要等到線程創建就能立即執行。 提高線程的可管理性 線程是 ...
概述
什麼是線程池?
線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然後在創建線程後自動啟動這些任務。
為什麼要用線程池?
- 降低資源消耗
- 通過重覆利用已創建的線程降低線程創建和銷毀造成的消耗。
- 提高響應速度
- 當任務到達時,任務可以不需要等到線程創建就能立即執行。
- 提高線程的可管理性
- 線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。但是要做到合理的利用線程池,必須對其原理瞭如指掌。
Executor 框架
簡介
- Executor:一個介面,其定義了一個接收 Runnable 對象的方法 executor,其方法簽名為 executor(Runnable command),
- ExecutorService:是一個比 Executor 使用更廣泛的子類介面,其提供了生命周期管理的方法,以及可跟蹤一個或多個非同步任務執行狀況返回 Future 的方法。
- AbstractExecutorService:ExecutorService 執行方法的預設實現。
- ScheduledExecutorService:一個可定時調度任務的介面。
- ScheduledThreadPoolExecutor:ScheduledExecutorService 的實現,一個可定時調度任務的線程池。
- ThreadPoolExecutor:線程池,可以通過調用 Executors 以下靜態工廠方法來創建線程池並返回一個 ExecutorService 對象。
ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor
類是 Executor 框架中最核心的一個類。
ThreadPoolExecutor 有四個構造方法,前三個都是基於第四個實現。第四個構造方法定義如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
參數說明
corePoolSize
:線程池的基本線程數。這個參數跟後面講述的線程池的實現原理有非常大的關係。在創建了線程池後,預設情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了 prestartAllCoreThreads()或者 prestartCoreThread()方法,從這 2 個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建 corePoolSize 個線程或者一個線程。預設情況下,在創建了線程池後,線程池中的線程數為 0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到 corePoolSize 後,就會把到達的任務放到緩存隊列當中。maximumPoolSize
:線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得註意的是如果使用了無界的任務隊列這個參數就沒什麼效果。keepAliveTime
:線程活動保持時間。線程池的工作線程空閑後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率。- unit:參數 keepAliveTime 的時間單位,有 7 種取值。可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
workQueue
:任務隊列。用於保存等待執行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。threadFactory
:創建線程的工廠。可以通過線程工廠給每個創建出來的線程設置更有意義的名字。- ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
- LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按 FIFO (先進先出) 排序元素,吞吐量通常要高於 ArrayBlockingQueue。靜態工廠方法 Executors.newFixedThreadPool()使用了這個隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於 LinkedBlockingQueue,靜態工廠方法 Executors.newCachedThreadPool 使用了這個隊列。
- PriorityBlockingQueue:一個具有優先順序的無限阻塞隊列。
handler
:飽和策略。當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是 AbortPolicy,表示無法處理新任務時拋出異常。以下是 JDK1.5 提供的四種策略。- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調用者所線上程來運行任務。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
- DiscardPolicy:不處理,丟棄掉。
- 當然也可以根據應用場景需要來實現 RejectedExecutionHandler 介面自定義策略。如記錄日誌或持久化不能處理的任務。
重要方法
在 ThreadPoolExecutor 類中有幾個非常重要的方法:
execute()
方法實際上是 Executor 中聲明的方法,在 ThreadPoolExecutor 進行了具體的實現,這個方法是 ThreadPoolExecutor 的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。submit()
方法是在 ExecutorService 中聲明的方法,在 AbstractExecutorService 就已經有了具體的實現,在 ThreadPoolExecutor 中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和 execute()方法不同,它能夠返回任務執行的結果,去看 submit()方法的實現,會發現它實際上還是調用的 execute()方法,只不過它利用了 Future 來獲取任務執行結果(Future 相關內容將在下一篇講述)。shutdown()
和shutdownNow()
是用來關閉線程池的。
向線程池提交任務
我們可以使用 execute
提交任務,但是 execute
方法沒有返回值,所以無法判斷任務是否被線程池執行成功。
通過以下代碼可知 execute
方法輸入的任務是一個 Runnable 實例。
threadsPool.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } });
我們也可以使用 submit
方法來提交任務,它會返回一個 Future
,那麼我們可以通過這個 Future
來判斷任務是否執行成功。
通過 Future
的 get
方法來獲取返回值,get
方法會阻塞住直到任務完成。而使用 get(long timeout, TimeUnit unit)
方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。
Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理無法執行任務異常 } finally { // 關閉線程池 executor.shutdown(); }
線程池的關閉
我們可以通過調用線程池的 shutdown
或 shutdownNow
方法來關閉線程池,它們的原理是遍歷線程池中的工作線程,然後逐個調用線程的 interrupt 方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow 首先將線程池的狀態設置成 STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而 shutdown 只是將線程池的狀態設置成 SHUTDOWN 狀態,然後中斷所有沒有正在執行任務的線程。
只要調用了這兩個關閉方法的其中一個,isShutdown 方法就會返回 true。當所有的任務都已關閉後,才表示線程池關閉成功,這時調用 isTerminaed 方法會返回 true。至於我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用 shutdown 來關閉線程池,如果任務不一定要執行完,則可以調用 shutdownNow。
Executors
JDK 中提供了幾種具有代表性的線程池,這些線程池是基於 ThreadPoolExecutor
的定製化實現。
在實際使用線程池的場景中,我們往往不是直接使用 ThreadPoolExecutor
,而是使用 JDK 中提供的具有代表性的線程池實例。
newCachedThreadPool
創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
這種類型的線程池特點是:
- 工作線程的創建數量幾乎沒有限制(其實也有限制的,數目為 Interger.MAX_VALUE), 這樣可靈活的往線程池中添加線程。
- 如果長時間沒有往線程池中提交任務,即如果工作線程空閑了指定的時間(預設為 1 分鐘),則該工作線程將自動終止。終止後,如果你又提交了新的任務,則線程池重新創建一個工作線程。
- 在使用 CachedThreadPool 時,一定要註意控制任務的數量,否則,由於大量線程同時運行,很有會造成系統癱瘓。
示例:
public class CachedThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 1000); } catch (InterruptedException e) { e.printStackTrace(); } executorService.execute(() -> System.out.println(Thread.currentThread().getName() + " 執行,i = " + index)); } } }
newFixedThreadPool
創建一個指定工作線程數量的線程池。每當提交一個任務就創建一個工作線程,如果工作線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。
FixedThreadPool 是一個典型且優秀的線程池,它具有線程池提高程式效率和節省創建線程時所耗的開銷的優點。但是,線上程池空閑時,即線程池中沒有可運行任務時,它不會釋放工作線程,還會占用一定的系統資源。
示例:
public class FixedThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int index = i; executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName() + " 執行,i = " + index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
newSingleThreadExecutor
創建一個單線程化的 Executor,即只創建唯一的工作者線程來執行任務,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。如果這個線程異常結束,會有另一個取代它,保證順序執行。單工作線程最大的特點是可保證順序地執行各個任務,並且在任意給定的時間不會有多個線程是活動的。
示例:
public class SingleThreadExecutorDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName() + " 執行,i = " + index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
newScheduleThreadPool
創建一個線程池,可以安排任務在給定延遲後運行,或定期執行。
public class ScheduledThreadPoolDemo { private static void delay() { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(() -> System.out.println(Thread.currentThread().getName() + " 延遲 3 秒"), 3, TimeUnit.SECONDS); } private static void cycle() { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate( () -> System.out.println(Thread.currentThread().getName() + " 延遲 1 秒,每 3 秒執行一次"), 1, 3, TimeUnit.SECONDS); } public static void main(String[] args) { delay(); cycle(); } }
源碼
線程池的具體實現原理,大致從以下幾個方面講解:
- 線程池狀態
- 任務的執行
- 線程池中的線程初始化
- 任務緩存隊列及排隊策略
- 任務拒絕策略
- 線程池的關閉
- 線程池容量的動態調整
線程池狀態
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; }
runState 表示當前線程池的狀態,它是一個 volatile 變數用來保證線程之間的可見性;
下麵的幾個 static final 變數表示 runState 可能的幾個取值。
當創建線程池後,初始時,線程池處於 RUNNING 狀態;
RUNNING -> SHUTDOWN
如果調用了 shutdown()方法,則線程池處於 SHUTDOWN 狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢。
(RUNNING or SHUTDOWN) -> STOP
如果調用了 shutdownNow()方法,則線程池處於 STOP 狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務。
SHUTDOWN -> TIDYING
當線程池和隊列都為空時,則線程池處於 TIDYING 狀態。
STOP -> TIDYING
當線程池為空時,則線程池處於 TIDYING 狀態。
TIDYING -> TERMINATED
當 terminated() 回調方法完成時,線程池處於 TERMINATED 狀態。
任務的執行
任務執行的核心方法是 execute()
方法。執行步驟如下:
- 如果少於 corePoolSize 個線程正在運行,嘗試使用給定命令作為第一個任務啟動一個新線程。對 addWorker 的調用會自動檢查 runState 和 workerCount,從而防止在不應該的情況下添加線程。
- 如果任務排隊成功,仍然需要仔細檢查是否應該添加一個線程(因為現有的線程自上次檢查以來已經死亡)或者自從進入方法後,線程池就關閉了。所以我們重新檢查狀態,如果有必要的話,線上程池停止狀態時回滾隊列,如果沒有線程的話,就開始一個新的線程。
- 如果任務排隊失敗,那麼我們嘗試添加一個新的線程。如果失敗了,說明線程池已經關閉了,或者已經飽和了,所以拒絕這個任務。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
線程池中的線程初始化
預設情況下,創建線程池之後,線程池中是沒有線程的,需要提交任務之後才會創建線程。
在實際中如果需要線程池創建之後立即創建線程,可以通過以下兩個方法辦到:
prestartCoreThread():初始化一個核心線程; prestartAllCoreThreads():初始化所有核心線程
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //註意傳進去的參數是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//註意傳進去的參數是null ++n; return n; }
任務緩存隊列及排隊策略
在前面我們多次提到了任務緩存隊列,即 workQueue,它用來存放等待執行的任務。
workQueue 的類型為 BlockingQueue,通常可以取下麵三種類型:
- ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;
- LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則預設為 Integer.MAX_VALUE;
- SynchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
任務拒絕策略
當線程池的任務緩存隊列已滿並且線程池中的線程數目達到 maximumPoolSize,如果還有任務到來就會採取任務拒絕策略,通常有以下四種策略
- ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出 RejectedExecutionException 異常。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重覆此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
線程池的關閉
ThreadPoolExecutor 提供了兩個方法,用於線程池的關閉,分別是 shutdown()和 shutdownNow(),其中:
- shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完後才終止,但再也不會接受新的任務
- shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務
線程池容量的動態調整
ThreadPoolExecutor 提供了動態調整線程池容量大小的方法:setCorePoolSize()和 setMaximumPoolSize(),
- setCorePoolSize:設置核心池大小
- setMaximumPoolSize:設置線程池最大能創建的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor 進行線程賦值,還可能立即創建新的線程來執行任務。
免費Java資料需要自己領取,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高併發分散式等教程,一共30G。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q