Java Executor併發框架(二)剖析ThreadPoolExecutor運行過程

来源:http://www.cnblogs.com/vhua/archive/2016/03/17/5285819.html
-Advertisement-
Play Games

上一篇從整體上介紹了 介面,從上一篇我們知道了 框架的最頂層實現是 類, 工廠類中提供的 、`newFixedThreadPool newCachedThreadPool ThreadPoolExecutor ThreadPoolExecutor`線程池的運行過程。 1.線程池狀態 既然要講運行過程


上一篇從整體上介紹了Executor介面,從上一篇我們知道了Executor框架的最頂層實現是ThreadPoolExecutor類,Executors工廠類中提供的newScheduledThreadPoolnewFixedThreadPoolnewCachedThreadPool方法其實也只是ThreadPoolExecutor的構造函數參數不同而已。通過傳入不同的參數,就可以構造出適用於不同應用場景下的線程池,那麼它的底層原理是怎樣實現的呢,這篇就來介紹下ThreadPoolExecutor線程池的運行過程。


1.線程池狀態

既然要講運行過程,那麼首先要瞭解下線程池的狀態分為哪些?

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

ThreadPoolExecutor代碼中定義了上面幾個變數:定義了一個volatile變數runState,以及其他幾個表示狀態的常量。
runState:初始狀態,表示當前線程池的運行狀態,它的值就是上面的那4個常量值之一

RUNNING:線程池接受新任務並執行隊列任務中...

SHUTDOWN:不再接受新任務,但是會繼續執行等待隊列Queued中的任務。當調用了shutdown()方法,會從 RUNNING -> SHUTDOWN

STOP:不再接受新任務,同時也不執行等待隊列Queued中的任務,並且會嘗試終止正在執行中的任務。當調用了shutdownNow()方法, 會從(RUNNING or SHUTDOWN) -> STOP

TERMINATED:線程池中所有線程已經停止運行,其他行為同 STOP狀態。

  • 當等待隊列和線程池為空時,會從SHUTDOWN -> TERMINATED
  • 當線程池為空時,會從STOP -> TERMINATED

2.線程池運行任務

2.1變數介紹

在講解運行過程前,我們先看下ThreadPoolExecutor中的幾個比較重要的成員變數:

private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來保存等待中的任務,等待worker線程空閑時執行任務
private final ReentrantLock mainLock = new ReentrantLock(); //更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 時需要持有這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來保存工作中的執行線程
private volatile long  keepAliveTime; //超過corePoolSize外的線程空閑存活之間
private volatile boolean allowCoreThreadTimeOut; //是否對corePoolSize內的線程設置空閑存活時間
private volatile int   corePoolSize; //核心線程數
private volatile int   maximumPoolSize; //最大線程數(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   poolSize; //線程池中的當前線程數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //線程工廠,用來新建線程
private int largestPoolSize; //記錄線程池中出現過的最大線程數大小
private long completedTaskCount; //已經執行完的線程數

這邊重點解釋下 corePoolSizemaximumPoolSizeworkQueue兩個變數,這兩個變數涉及到線程池中創建線程個數的一個策略。
corePoolSize: 這個變數我們可以理解為線程池的核心大小,舉個例子來說明(corePoolSize假設等於10,maximumPoolSize等於20):

  1. 有一個部門,其中有10(corePoolSize)名工人,當有新任務來了後,領導就分配任務給工人去做,每個工人只能做一個任務。
  2. 當10個工人都在忙時,新來的任務就要放到隊列(workQueue)中等待。
  3. 當任務越積累越多,遠遠超過工人做任務的速度時,領導就想了一個辦法:從其他部門借10個工人來,借的數量有一個公式(maximumPoolSize - corePoolSize)來計算。然後把新來的任務分配給借來的工人來做。
  4. 但是如果速度還是還不急的話,可能就要採取措施來放棄一些任務了(RejectedExecutionHandler)。
    等到一定時間後,任務都完成了,工人比較閑的情況下,就考慮把借來的10個工人還回去(根據keepAliveTime判斷)
  5. 也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。

2.2線程執行過程

先看下前一篇文章中的一個例子:

ExecutorService executor = Executors.newFixedThreadPool(3);

        IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("finished: " + threadName);
        }));

上面代碼就是新建6個任務,然後扔到線程池中運行,輸出線程名稱,直到運行完畢。其中最核心的方法就是execute()方法,雖然submit()也可以執行任務,但它底層也是調用execute()方法,所以懂了execute()的實現原理即可:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {   //1.
            if (runState == RUNNING && workQueue.offer(command)) {    //2.
                if (runState != RUNNING || poolSize == 0)   //3.
                    ensureQueuedTaskHandled(command);  //4.
            }
            else if (!addIfUnderMaximumPoolSize(command))  //5.
                reject(command); // is shutdown or saturated //6
        }
    }

上面的代碼看起來邏輯有點複雜,我們一個一個看,首先看上面1位置處:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
是一個或表達式,它分成兩部分

  1. 首先判斷當前線程數是否大於等於核心線程數,是的話直接進入if語句塊中,否則判斷第二個部分
  2. 第二個部分addIfUnderCorePoolSize(command) ,這個方法是當線程數小於核心線程數時,用來新建線程執行任務(因為線程數小於corePoolSize時,直接新建線程來運行任務,不管當前線程池裡有沒有空閑的線程)。如果新建失敗,那麼進入if語句塊,成功了那麼execute方法就執行結束了,因為線程已經新建成功了,任務已經開始線上程池中運行。

進入if語句塊後,看上面代碼2.if (runState == RUNNING && workQueue.offer(command))

  1. 判斷當前線程池狀態是否是RUNNING 而且 任務放入等待隊列中成功,那麼直接進入if語句塊
  2. 否則到代碼5.處 if (!addIfUnderMaximumPoolSize(command)),判斷新任務用新線程執行是否成功(註:這裡的新線程就是我們上面講的 “借來的工人” maximumPoolSize)
  3. 如果“借來的工人”還是處理不了的話,執行任務拒絕策略

繼續進到代碼塊3 的if語句塊if (runState != RUNNING || poolSize == 0), 因為新任務加入到等待隊列中了,這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是的話,應急處理加入的新任務 ensureQueuedTaskHandled(command)


我們看下兩個關鍵方法的實現:
##### 1.addIfUnderCorePoolSize

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

首先獲取鎖,因為涉及到線程池狀態的變化。然後再次判斷 if (poolSize < corePoolSize && runState == RUNNING),在execute()方法中我們已經判斷過一次,這邊再次判斷是為了防止其他線程又新增了新線程或者調用了shutdown、shutdownNow方法,這邊起到了雙重檢查的一個效果。如果為true的話,進行t = addThread(firstTask)新增線程執行任務。addThread方法裡面比較簡單,就是通過線程工廠創建線程thread,然後封裝到Worker對象中,加入到 workers隊列中,並執行線程,可以把Worker對象看成是擁有一個線程的對象。

private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        boolean workerStarted = false;
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
                t.start();
                workerStarted = true;
            }
        }
        return t;
    }

這裡在介紹下Worker對象, 它實現了Runnable介面,你把它當成Runnable的一個代理類即可,最終也是執行它的run方法。只要註意一下Worker中的beforeExecuteafterExecute方法,這兩個方法在ThreadPoolExecutor中沒有具體實現,用戶可以重寫這個方法和後面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等,而afterExecute方法還有一個Throwable t參數,用戶可以用來記錄一些異常信息,因為新線程中的異常時捕獲不到的,需要在afterExecute中記錄。
看起來這個是不是和spring 切麵有點像,可以看到 知識都是相通的。
看一下它的run方法:

public void run() {
            try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {  //1
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

註意代碼塊1,可以看到這邊在迴圈獲取任務,並執行,直到任務全部執行完畢。除了第一個任務,其他任務都是通過getTask()方法去取,這個方法是ThreadPoolExecutor中的一個方法。我們猜一下,整個類中只有任務緩存隊列中保存了任務,應該就是去緩存隊列中取了。

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll(); //取任務
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大於核心池大小或者允許為核心池線程設置空閑時間,
                //則通過poll取任務,若等待一定的時間取不到任務,則返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中斷處於空閑狀態的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

這裡有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給 空閑線程執行。但是在這裡,並沒有採用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和複雜度,這裡直接讓執行完任務的線程Worker去任務緩存隊列裡面取任務來執行,因為每一個Worker裡面都包含了一個線程thread。


2. addIfUnderMaximumPoolSize

這個方法的實現思想和 addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是線上程 池中的線程數達到了核心池大小並且往任務隊列中添加任務失敗的情況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

到這裡,大部分朋友應該對任務提交給線程池之後到被執行的整個過程有了一個基本的瞭解,下麵總結一下:

  1. 首先,要清楚corePoolSize和maximumPoolSize的含義;
  2. 其次,要知道Worker是用來起到什麼作用的;
  3. 要知道任務提交給線程池之後的處理策略,這裡總結一下主要有4點:
  • 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
  • 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
  • 如果當前線程池中的線程數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
  • 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於 corePoolSize;如果允許為核心池中的線程設置存活時間,那麼核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

這篇寫完了,後面會介紹一下任務緩存隊列的種類已經緩存的策略以及任務拒絕策略等。如果文章有什麼問題,歡迎大家指正,大家互相溝通,互相學習。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 內聯函數的用處: 用空間換取時間,在調用時不用每次都寫調用的彙編。 什麼時候內聯: 比較小的函數:只有兩三行 在迴圈里迴圈調用的函數 什麼時候不內聯: 比較大的函數,2、30行的 遞歸的函數
  • 轉發自:http://blog.csdn.net/ligang7560/article/details/50890282 單例模式的多種實現方式 我們都知道單例模式有幾種常用的寫法: 餓漢模式 懶漢模式 雙重校驗鎖 靜態內部類 靜態代碼塊 我們來看一下這幾種模式在多線程的場景中,能否保持單例 1.餓
  • 繼續更新有關重構的博客,前三篇是關於類、函數和數據的重構的博客,內容還算比較充實吧。今天繼續更新,本篇博客的主題是關於條件表達式的重構規則。有時候在實現比較複雜的業務邏輯時,各種條件各種嵌套。如果處理不好的話,代碼看上去會非常的糟糕,而且業務邏輯看上去會非常混亂。今天就通過一些重構規則來對條件表達式
  • 原題重述:(點擊圖片可以進入來源鏈接) 這到題目的中文解釋是, 輸入一個數組,例如{-1 0 1 2 -1 -4},從數組中找三個數(a,b,c),使得其和0,輸出所有的(a,b,c)組合。 要求abc不能重覆,並且a<=b<=c。 拿到這個題目的時候,其實每個程式猿都能想到如下的演算法,也就是暴力破
  • 正則表達式並不是Python的一部分。正則表達式是用於處理字元串的強大工具,擁有自己獨特的語法以及一個獨立的處理引擎,效率上可能不如str自帶的方法,但功能十分強大。得益於這一點,在提供了正則表達式的語言里,正則表達式的語法都是一樣的,區別隻在於不同的編程語言實現支持的語法數量不同;但不用擔心,不被
  • 運行一下上面這段代碼,看會有什麼提示信息? Warning: preg_match(): Compilation failed: PCRE does not support L, l, N, P, p, U, u, or X at offset 3 in F:http://www.hzhuti.co
  • 演算法:用原來字母後面的第四個字母替代原來的字母。明文:China 密文:Glmre
  • Implement atoi to convert a string to an integer. Hint: Carefully consider all possible input cases. If you want a challenge, please do not see below
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...