前言:線程池技術是通過對線程資源的統一管理來達到對線程資源的重覆利用,降低線程頻繁創建和銷毀的開銷。java jdk在java.util.concurrent併發包中有一套現成的對線程池的實現方案,我們可以直接拿來使用,快速實現多線程併發編程場景。這裡對concurrent包中的線程池框架的實現進行 ...
前言:線程池技術是通過對線程資源的統一管理來達到對線程資源的重覆利用,降低線程頻繁創建和銷毀的開銷。java jdk在java.util.concurrent併發包中有一套現成的對線程池的實現方案,我們可以直接拿來使用,快速實現多線程併發編程場景。這裡對concurrent包中的線程池框架的實現進行一些分析。
java線程池使用代碼示例
public class Test { public static void main(String[] args) throws Exception { Task task1 = new Task(1); Task task2 = new Task(2); // ExecutorService normalExecutor = new ThreadPoolExecutor(2, 4, 200, TimeUnit.MILLISECONDS, // new ArrayBlockingQueue<Runnable>(5)); // ExecutorService singleExecutor = Executors.newSingleThreadExecutor(); // ExecutorService cachedExecutor = Executors.newCachedThreadPool(); //創建線程池服務
ExecutorService executor = Executors.newFixedThreadPool(2);
//將任務交給線程池執行 executor.execute(task1); executor.execute(task2); executor.shutdown(); } } //可以提交給線程池執行的任務類,線程池執行任務時會執行其中的run方法 class Task implements Runnable { private int taskNum; public Task(int num) { this.taskNum = num; } public void run() { System.out.println("開始執行任務:" + taskNum); try { Thread.currentThread().sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("結束執行任務:" + taskNum); } }
執行結果如下:
從結果可以看出後提交給線程池的任務先執行了。所以執行execute方法時只是將任務提交給線程池去管理,任務的執行順序是由線程池內部去協調的。
java線程池實現原理
java線程池的核心實現類是ThreadPoolExecutor,該類的繼承關係如下:
最底層其實現的介面Executor的定義如下:
public interface Executor { void execute(Runnable command); }
可以看到,該介面只有一個方法execute,ThreadPoolExecutor實現該方法後,通過該方法的調用將任務提交給線程池。所以ThreadPoolExecutor.execute里的邏輯就是線程池執行任務的密碼所在。
這裡先關註下ThreadPoolExecutor類中如下幾個比較重要的常量
//記錄當前線程池中工作線程的數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //將一個整形的32位分為兩部分,高3位和低29位 private static final int COUNT_BITS = Integer.SIZE - 3; //將整形的低29位用於存儲工作線程數,所以可開啟的最大線程數為2的29次方 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //將整形的高3位用於存儲線程池的當前狀態值 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
ThreadPoolExecutor里很多地方使用了類似的位運算的方式進行狀態值的存儲和邏輯運算來提高運行效率
現在看下ThreadPoolExecutor類的構造函數,構造函數的參數揭示了線程池的核心變數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
這裡涉及的幾個核心變數解釋如下:
corePoolSize:核心線程數,線程池運行穩定後維持的線程數
maximumPoolSize:最大線程數,線程池最多可以使用的線程數
keepAliveTime:超時時間,線程在該超時時間間隔內從任務隊列未獲取到任務時,若線程池工作線程數超過corePoolSize,則關閉當前線程,且線程池線程數減1
unit:keepAliveTime使用的時間單位
workQueue:任務存放的隊列
threadFactory:線程工廠,線程池使用該工廠類的方法產生線程
handler:當線程池中線程數已達最大值,且任務隊列已滿,無法處理新加入的任務時。由自定義的handler處理該任務
ThreadPoolExecutor.execute對任務的處理流程如下圖:
ThreadPoolExecutor.execute的執行示意圖如下:
通過ThreadPoolExecutor構造函數的參數,我們發現如果我們要通過ThreadPoolExecutor創建一個適合我們業務場景的線程池,需要對ThreadPoolExecutor的運行原理和幾個核心參數有比較深入的理解。線程池的設計者在這方面也做了一定的考慮,在concurrent包中,提供了一個有用的工具類Executors,這個類提供了一些工廠方法可以幫助我們簡單方便的創建出適用於各種場景的線程池,有些方法就是對ThreadPoolExecutor做了簡單的封裝。其中,業務上比較常用到的獲取線程池的工廠方法有如下幾個:
//創建固定大小的線程池,在併發任務比較多的場景中比較常用 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //創建一個單線程化的線程池,線程池只使用一個線程執行所有的任務,可以保證任務的執行順序 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //創建一個可緩存線程池,隊列只能存放一個元素,任務會及時被線程處理,適用於對任務處理的及時性要求比較高的場景 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }