1. 線程池相關基本概念 任務(Task):任務是線程池中要執行的工作單元。任務可以是實現了 Runnable 介面或 Callable 介面的對象。Runnable 任務沒有返回值,而 Callable 任務可以返回一個結果。 線程池管理器(ThreadPool Manager):線程池管理器是用 ...
1. 線程池相關基本概念
-
任務(Task):任務是線程池中要執行的工作單元。任務可以是實現了
Runnable
介面或Callable
介面的對象。Runnable
任務沒有返回值,而Callable
任務可以返回一個結果。 -
線程池管理器(ThreadPool Manager):線程池管理器是用於創建和管理線程池的組件。它負責創建線程池,控制線程的創建和銷毀,並調度任務的執行。
-
工作線程(Worker Threads):工作線程是線程池中實際執行任務的線程。線程池中可以有多個工作線程,它們並行地從任務隊列中獲取任務並執行。
-
任務隊列(Task Queue):任務隊列是用於存儲待執行的任務的數據結構。當線程池中的工作線程空閑時,它們會從任務隊列中獲取任務並執行。
-
拒絕策略(Rejection Policy):拒絕策略定義了當任務隊列已滿且無法繼續接收新的任務時,線程池應該如何處理新的任務。常見的拒絕策略包括丟棄任務、丟棄最早的任務、拋出異常等。
-
線程池大小(Pool Size):線程池大小指定了線程池中工作線程的數量。線程池的大小可以是固定的,也可以是根據需要自動調整的。
-
核心線程數(Core Pool Size):核心線程數是線程池中保持活動狀態的最小工作線程數量。即使線程處於空閑狀態,核心線程也不會被銷毀。
-
最大線程數(Maximum Pool Size):最大線程數是線程池中允許的最大工作線程數量。當任務隊列已滿且活動線程數達到最大線程數時,線程池可能會創建新的線程來執行任務。
-
閑置線程回收時間(Keep-Alive Time):閑置線程回收時間是指當線程池中的線程數超過核心線程數,並且空閑一段時間後,多餘的線程會被銷毀。
-
線程工廠(Thread Factory):線程工廠用於創建線程池中的工作線程。它負責創建線程,並可以自定義線程的屬性和命名方式。
線程池的設計目的是提高系統的性能和資源利用率。通過重用線程和控制併發線程的數量,線程池可以減少線程創建和銷毀的開銷,避免資源耗盡,並提供更好的任務調度和執行控制。
在使用線程池時,我們可以根據任務的類型和系統的需求來選擇適當的線程池大小、拒絕策略和其他參數,以實現最佳的性能和可擴展性。
2. 線程池主要處理流程
- 判斷核心線程池是否已滿,如果不是,則創建線程執行任務
- 如果核心線程池滿了,判斷隊列是否滿了,如果隊列沒滿,將任務放在隊列中
- 如果隊列滿了,則判斷線程池是否已滿,如果沒滿,創建線程執行任務
- 如果線程池也滿了,則按照拒絕策略對任務進行處理
更進一步的裡層核心類處理流程:
當我們使用 Java 中的 ThreadPoolExecutor
類來創建線程池時,其處理流程如下:
-
任務提交:外部調用者通過調用
ThreadPoolExecutor
的execute()
或submit()
方法將任務提交給線程池。 -
任務接收:
ThreadPoolExecutor
接收到任務後,首先檢查線程池的狀態。如果線程池已經關閉,就不再接收新的任務。 -
任務排隊:線程池將接收到的任務放入內部的任務隊列中等待執行。任務隊列可以是有界隊列(如
ArrayBlockingQueue
)或無界隊列(如LinkedBlockingQueue
或SynchronousQueue
)。 -
工作線程獲取任務:線程池中的工作線程從任務隊列中獲取任務。如果任務隊列為空,工作線程可能會阻塞等待新任務的到來,或者在等待一定時間後退出。
-
任務執行:工作線程獲取到任務後,調用任務對象的
run()
方法執行任務的具體邏輯。 -
任務完成:任務執行完成後,可以返回一個結果(對於
Callable
任務),或者不返回任何結果(對於Runnable
任務)。 -
任務狀態更新:
ThreadPoolExecutor
會更新任務的狀態,包括任務的執行進度、執行結果等信息。 -
結果返回:如果任務是
Callable
任務,ThreadPoolExecutor
會將任務的執行結果封裝在Future
對象中返回給調用者。調用者可以通過Future
對象獲取任務的執行結果。 -
繼續處理下一個任務:工作線程完成當前任務後,會繼續從任務隊列中獲取下一個任務進行處理。
-
線程回收:如果線程池中的線程處於空閑狀態,並且空閑時間超過一定閾值,
ThreadPoolExecutor
可能會回收這些空閑線程,以避免資源的浪費。 -
異常處理:
ThreadPoolExecutor
會捕獲任務執行過程中的異常,並根據預定義的異常處理策略進行處理,比如記錄日誌、統計異常次數等。 -
線程池關閉:當不再需要線程池時,調用
ThreadPoolExecutor
的shutdown()
或shutdownNow()
方法來停止線程池的運行。關閉線程池的過程包括不再接收新任務、等待已提交的任務執行完成、銷毀工作線程等操作。
ThreadPoolExecutor
是 Java 中用於創建和管理線程池的核心類,通過其靈活的配置參數,可以實現對線程池的各種行為和特性進行定製。這個類提供了豐富的方法和選項,用於控制線程池的大小、任務隊列類型、拒絕策略、線程工廠等,以滿足不同場景下的需求。
3. 線程池實現的主要步驟
3.1 創建線程池
在 Java 中,可以使用 Executors
類提供的靜態方法創建線程池。以下是幾種常見的創建線程池的方法:
3.1.1 創建固定大小的線程池
固定大小的線程池將在初始化時創建指定數量的線程,並且不會增加或減少線程的數量。
ExecutorService executorService = Executors.newFixedThreadPool(10);//創建一個固定大小為 10 的線程池
3.1.2 創建單個線程的線程池
單個線程的線程池只會創建一個工作線程來執行任務。
ExecutorService executorService = Executors.newSingleThreadExecutor();
3.1.3 創建可根據需要自動調整大小的線程池
可根據需要自動調整大小的線程池將根據任務的數量動態地增加或減少線程的數量。
ExecutorService executorService = Executors.newCachedThreadPool();
3.1.4 手動按自己需求創建線程池
在 Java 中,可以手動創建線程池,而不僅僅依賴於內置的線程池實現。手動創建線程池的主要原因是為了更好地控制線程池的行為、特性和參數配置,以滿足特定的需求,比如特定的任務隊列類型需求,特定的拒絕策略需求
根據ThreadPoolExecutor構造方法可知,需要準備以下參數:
-
核心線程數(corePoolSize):核心線程數是線程池中保持活動狀態的線程數。即使這些線程處於空閑狀態,它們也不會被回收。線程池會根據任務的數量和任務隊列的狀態來動態調整線程池中的線程數量。
-
最大線程數(maximumPoolSize):最大線程數指定了線程池中允許存在的最大線程數量。當任務數量超過核心線程數並且任務隊列已滿時,線程池會創建新的線程來處理任務,直到達到最大線程數。如果達到最大線程數後仍有任務到來,採用拒絕策略處理新任務。
-
空閑線程存活時間(keepAliveTime):當線程池中的線程數超過核心線程數,並且處於空閑狀態時,空閑線程存活時間指定了它們在沒有接收到新任務時的存活時間。超過存活時間後,空閑線程將被回收,直到線程池中的線程數不超過核心線程數。
-
時間單位(unit):用於指定時間參數的單位,可以是秒、毫秒、微秒等。
-
任務隊列(workQueue):任務隊列用於存儲等待執行的任務。可以選擇合適的隊列類型,如有界隊列(如
ArrayBlockingQueue
)或無界隊列(如LinkedBlockingQueue
)。 -
線程工廠(threadFactory):線程工廠用於創建線程對象。可以自定義線程工廠類,實現創建線程的邏輯。
-
拒絕策略(rejectedExecutionHandler):當任務隊列已滿並且線程池中的線程數達到最大線程數時,拒絕策略指定瞭如何處理新的任務。可以選擇預定義的拒絕策略,如拋出異常、丟棄任務等,或者自定義拒絕策略。
import java.util.concurrent.*; // 自定義拒絕策略類 class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { // 自定義拒絕策略的邏輯 System.out.println("Task Rejected: " + runnable.toString()); // 可根據需求進行不同的處理方式,如拋出異常、丟棄任務、調用者執行等 } } // 自定義線程工廠類 class CustomThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable runnable) { // 自定義線程工廠的邏輯 Thread thread = new Thread(runnable); // 可以進行一些線程屬性的配置,如設置線程名稱、優先順序等 thread.setName("CustomThread"); thread.setPriority(Thread.NORM_PRIORITY); return thread; } } public class ManualThreadPoolCreationExample { public static void main(String[] args) { // 創建自定義的拒絕策略實例 RejectedExecutionHandler rejectionHandler = new CustomRejectedExecutionHandler(); // 創建自定義的線程工廠實例 ThreadFactory threadFactory = new CustomThreadFactory(); // 創建任務隊列(這裡使用無界隊列) BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); // 創建線程池併進行手動配置 int corePoolSize = 10; int maxPoolSize = 20; long keepAliveTime = 60; TimeUnit timeUnit = TimeUnit.SECONDS; ThreadPoolExecutor threadPool = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, rejectionHandler ); } }
3.2 提交任務給線程池執行
創建線程池之後,可以將任務提交給線程池執行。任務可以是實現了 Runnable
介面的對象,也可以是實現了 Callable
介面的對象。
3.2.1 提交 Runnable
任務
executorService.execute(new Runnable() { @Override public void run() { // 任務邏輯 } }); 或者使用 Lambda 表達式: executorService.execute(() -> { // 任務邏輯 });
3.2.2 提交 Callable
任務
Callable
任務可以返回一個結果
Future<SomeResult> future = executorService.submit(new Callable<SomeResult>() { @Override public SomeResult call() throws Exception { // 任務邏輯 return someResult; } }); 或者使用 Lambda 表達式: Future<SomeResult> future = executorService.submit(() -> { // 任務邏輯 return someResult; });
3.3 關閉線程池
當不再需要線程池時,應該顯式地關閉它,以釋放資源。關閉線程池兩種方式
executorService.shutdown(); // 不再接受新的任務,但會等待已提交的任務執行完成。 executorService.shutdownNow(); //希望立即關閉線程池,並嘗試中斷正在執行的任務
3.4 處理任務執行結果
當提交任務給線程池執行後,可以通過 Future
對象來獲取任務的執行結果。
Future<SomeResult> future = executorService.submit(...); try { SomeResult result = future.get(); // 處理結果 } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理執行異常 }
使用 future.get()
方法可以阻塞當前線程,直到任務執行完成並返回結果。get()
方法可能會拋出 InterruptedException
和 ExecutionException
異常,需要進行適當的異常處理。