jdk線程池ThreadPoolExecutor優雅停止原理解析(自己動手實現線程池)(二)

来源:https://www.cnblogs.com/xiaoxiongcanguan/archive/2022/11/17/16901298.html
-Advertisement-
Play Games

jdk線程池工作原理解析(二) 本篇博客是jdk線程池ThreadPoolExecutor工作原理解析系列博客的第二篇,在第一篇博客中從源碼層面分析了ThreadPoolExecutor在RUNNING狀態下處理任務的核心邏輯,而在這篇博客中將會詳細講解jdk線程池ThreadPoolExecuto ...


jdk線程池工作原理解析(二)

本篇博客是jdk線程池ThreadPoolExecutor工作原理解析系列博客的第二篇,在第一篇博客中從源碼層面分析了ThreadPoolExecutor在RUNNING狀態下處理任務的核心邏輯,而在這篇博客中將會詳細講解jdk線程池ThreadPoolExecutor優雅停止的實現原理。

ThreadPoolExecutor優雅停止源碼分析(自己動手實現線程池v2版本)

ThreadPoolExecutor為了實現優雅停止功能,為線程池設置了一個狀態屬性,其共有5種情況。
在第一篇博客中曾介紹過,AtomicInteger類型的變數ctl同時維護了兩個業務屬性當前活躍工作線程個數與線程池狀態,其中ctl的高3位用於存放線程池狀態。

線程池工作狀態介紹

線程池工作狀態是單調推進的,即從運行時->停止中->完全停止。共有以下五種情況

1. RUNNING

RUNNING狀態,代表著線程池處於正常運行(運行時)。RUNNING狀態的線程池能正常的接收並處理提交的任務
ThreadPoolExecutor初始化時對ctl賦予的預設屬性便是RUNNING(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)
RUNNING狀態下線程池正常工作的原理已經在第一篇博客中詳細的介紹過了,這裡不再贅述。

2. SHUTDOWN

SHUTDOWN狀態,代表線程池處於停止對外服務的狀態(停止中)。不再接收新提交的任務,但依然會將workQueue工作隊列中積壓的任務逐步處理完。
用戶可以通過調用shutdown方法令線程池由RUNNING狀態進入SHUTDOWN狀態,shutdown方法會在下文詳細展開分析。

3. STOP

STOP狀態,代表線程池處於停止狀態。不再接受新提交的任務(停止中),同時也不再處理workQueue工作隊列中積壓的任務,當前還在處理任務的工作線程將收到interrupt中斷通知
用戶可以通過調用shutdownNow方法令線程池由RUNNING或者SHUTDOWN狀態進入STOP狀態,shutdownNow方法會在下文詳細展開分析。

4. TIDYING

TIDYING狀態,代表著線程池即將完全終止,正在做最後的收尾工作(停止中)。
線上程池中所有的工作線程都已經完全退出,且工作隊列中的任務已經被清空時會由SHUTDOWN或STOP狀態進入TIDYING狀態。

5. TERMINATED

TERMINATED狀態,代表著線程池完全的關閉(完全停止)。
線程池狀態流轉圖

public class MyThreadPoolExecutorV2 implements MyThreadPoolExecutor {
    /**
     * 當前線程池中存在的worker線程數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;

    /**
     * 32位的有符號整數,有3位是用來存放線程池狀態的,所以用來維護當前工作線程個數的部分就只能用29位了
     * 被占去的3位中,有1位原來的符號位,2位是原來的數值位
     * */
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    /**
     * 線程池狀態poolStatus常量(狀態值只會由小到大,單調遞增)
     * 線程池狀態遷移圖:
     *         ↗ SHUTDOWN ↘
     * RUNNING       ↓       TIDYING → TERMINATED
     *         ↘   STOP   ↗
     * 1 RUNNING狀態,代表著線程池處於正常運行的狀態。能正常的接收並處理提交的任務
     * 線程池對象初始化時,狀態為RUNNING
     * 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     *
     * 2 SHUTDOWN狀態,代表線程池處於停止對外服務的狀態。不再接收新提交的任務,但依然會將workQueue工作隊列中積壓的任務處理完
     * 調用了shutdown方法時,狀態由RUNNING -> SHUTDOWN
     * 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
     *
     * 3 STOP狀態,代表線程池處於停止狀態。不再接受新提交的任務,同時也不再處理workQueue工作隊列中積壓的任務,當前還在處理任務的工作線程將收到interrupt中斷通知
     * 之前未調用shutdown方法,直接調用了shutdownNow方法,狀態由RUNNING -> STOP
     * 之前先調用了shutdown方法,後調用了shutdownNow方法,狀態由SHUTDOWN -> STOP
     * 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
     *
     * 4 TIDYING狀態,代表著線程池即將完全終止,正在做最後的收尾工作
     * 當前線程池狀態為SHUTDOWN,任務被消費完工作隊列workQueue為空,且工作線程全部退出完成工作線程集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
     * 當前線程池狀態為STOP,工作線程全部退出完成工作線程集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
     * 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
     *
     * 5 TERMINATED狀態,代表著線程池完全的關閉。之前線程池已經處於TIDYING狀態,且調用的鉤子函數terminated已返回
     * 當前線程池狀態為TIDYING,調用的鉤子函數terminated已返回
     * 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
     * */

    //  11100000 00000000 00000000 00000000
    private static final int RUNNING = -1 << COUNT_BITS;
    //  00000000 00000000 00000000 00000000
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    //  00100000 00000000 00000000 00000000
    private static final int STOP = 1 << COUNT_BITS;
    //  01000000 00000000 00000000 00000000
    private static final int TIDYING = 2 << COUNT_BITS;
    //  01100000 00000000 00000000 00000000
    private static final int TERMINATED = 3 << COUNT_BITS;

    private static int runStateOf(int c) {
        return c & ~CAPACITY;
    }
    
    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }
    
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * 推進線程池工作狀態
     * */
    private void advanceRunState(int targetState) {
        for(;;){
            // 獲得當前的線程池狀態
            int currentCtl = this.ctl.get();

            // 1 (runState >= targetState)如果當前線程池狀態不比傳入的targetState小
            // 代表當前狀態已經比參數要制定的更加快(或者至少已經處於對應階段了),則無需更新poolStatus的狀態(或語句中第一個條件為false,直接break了)
            // 2  (this.ctl.compareAndSet),cas的將runState更新為targetState
            // 如果返回true則說明cas更新成功直接break結束(或語句中第一個條件為false,第二個條件為true)
            // 如果返回false說明cas爭搶失敗,再次進入while迴圈重試(或語句中第一個和第二個條件都是false,不break而是繼續執行迴圈重試)
            if (runStateAtLeast(currentCtl, targetState) ||
                    this.ctl.compareAndSet(
                            currentCtl,
                            ctlOf(targetState, workerCountOf(currentCtl)
                            ))) {
                break;
            }
        }
    }
}    
  • 因為線程池狀態不是單獨存放,而是放在ctl這一32位數據的高3位的,讀寫都比較麻煩,因此提供了runStateOf和ctlOf等輔助方法(位運算)來簡化操作。
  • 線程池的狀態是單調遞進的,由於巧妙的將狀態靠前的值設置的更小,因此通過直接比較狀態的值來判斷當前線程池狀態是否推進到了指定的狀態(runStateLessThan、runStateAtLeast、isRunning、advanceRunState)。

jdk線程池ThreadPoolExecutor優雅停止具體實現原理

線程池的優雅停止一般要能做到以下幾點:

  1. 線程池在中止後不能再受理新的任務
  2. 線程池中止的過程中,已經提交的現存任務不能丟失(等待剩餘任務執行完再關閉或者能夠把剩餘的任務吐出來還給用戶)
  3. 線程池最終關閉前,確保創建的所有工作線程都已退出,不會出現資源的泄露

下麵我們從源碼層面解析ThreadPoolExecutor,看看其是如何實現上述這三點的.

如何中止線程池

ThreadPoolExecutor線程池提供了shutdown和shutdownNow這兩個public方法給使用者用於發出線程池的停止指令。

shutdown方法

shutdown方法用於關閉線程池,並令線程池從RUNNING狀態轉變位SHUTDOWN狀態。位於SHUTDOWN狀態的線程池,不再接收新任務,但已提交的任務會全部被執行完。

    /**
     * 關閉線程池(不再接收新任務,但已提交的任務會全部被執行)
     * 但不會等待任務徹底的執行完成(awaitTermination)
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;

        // shutdown操作中涉及大量的資源訪問和更新,直接通過互斥鎖防併發
        mainLock.lock();
        try {
            // 用於shutdown/shutdownNow時的安全訪問許可權
            checkShutdownAccess();
            // 將線程池狀態從RUNNING推進到SHUTDOWN
            advanceRunState(SHUTDOWN);
            // shutdown不會立即停止所有線程,而僅僅先中斷idle狀態的多餘線程進行回收,還在執行任務的線程就慢慢等其執行完
            interruptIdleWorkers();
            // 單獨為ScheduledThreadPoolExecutor開的一個鉤子函數(hook for ScheduledThreadPoolExecutor)
            onShutdown();
        } finally {
            mainLock.unlock();
        }

        // 嘗試終止線程池
        tryTerminate();
    }

    /**
     * 用於shutdown/shutdownNow時的安全訪問許可權
     * 檢查當前調用者是否有許可權去通過interrupt方法去中斷對應工作線程
     * */
    private void checkShutdownAccess() {
        // 判斷jvm啟動時是否設置了安全管理器SecurityManager
        SecurityManager security = System.getSecurityManager();
        // 如果沒有設置,直接返回無事發生

        if (security != null) {
            // 設置了許可權管理器,驗證當前調用者是否有modifyThread的許可權
            // 如果沒有,checkPermission會拋出SecurityException異常
            security.checkPermission(shutdownPerm);
    
            // 通過上述校驗,檢查工作線程是否能夠被調用者訪問
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (MyWorker w : workers) {
                    // 檢查每一個工作線程中的thread對象是否有許可權被調用者訪問
                    security.checkAccess(w.thread);
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

    /**
     * 中斷所有處於idle狀態的線程
     * */
    private void interruptIdleWorkers() {
        // 預設打斷所有idle狀態的工作線程
        interruptIdleWorkers(false);
    }

    private static final boolean ONLY_ONE = true;

    /**
     * 中斷處於idle狀態的線程
     * @param onlyOne 如果為ture,至多只中斷一個工作線程(可能一個都不中斷)
     *                如果為false,中斷workers內註冊的所有工作線程
     * */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (MyWorker w : workers) {
                Thread t = w.thread;
                // 1. t.isInterrupted(),說明當前線程存在中斷信號,之前已經被中斷了,無需再次中斷
                // 2. w.tryLock(), runWorker方法中如果工作線程獲取到任務開始工作,會先進行Lock加鎖
                // 則這裡的tryLock會加鎖失敗,返回false。 而返回true的話,就說明當前工作線程是一個idle線程,需要被中斷
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        // tryLock成功時,會將內部state的值設置為1,通過unlock恢復到未加鎖的狀態
                        w.unlock();
                    }
                }
                if (onlyOne) {
                    // 參數onlyOne為true,至多只中斷一個工作線程
                    // 即使上面的t.interrupt()沒有執行,也在這裡跳出迴圈
                    break;
                }
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    /**
     * 單獨為jdk的ScheduledThreadPoolExecutor開的一個鉤子函數
     * 由ScheduledThreadPoolExecutor繼承ThreadExecutor時重寫(包級別訪問許可權)
     * */
    void onShutdown() {}
  1. shutdown方法在入口處使用mainLock加鎖後,通過checkShutdownAccess檢查當前是否有許可權訪問工作線程(前提是設置了SecurityManager),如果無許可權則會拋出SecurityException異常。
  2. 通過advanceRunState方法將線程池狀態推進到SHUTDOWN。
  3. 通過interruptIdleWorkers使用中斷指令(Thread.interrupt)喚醒所有處於idle狀態的工作線程(存在idle狀態的工作線程代表著當前工作隊列是空的)。
    idle的工作線程在被喚醒後從getTask方法中退出(getTask中對應的退出邏輯在下文中展開),進而退出runWorker方法,最終系統回收掉工作線程占用的各種資源(第一篇博客中runWorker的解析中提到過)。
  4. 調用包級別修飾的鉤子函數onShutdown。這一方法是作者專門為同為java.util.concurrent包下的ScheduledThreadPoolExecutor提供的拓展,不在本篇博客中展開。
  5. 前面提到SHUTDOWN狀態的線程池在工作線程都全部退出且工作隊列為空時會轉變為TIDYING狀態,因此通過調用tryTerminate方法嘗試終止線程池(當前不一定會滿足條件,比如調用了shutdown但工作隊列還有很多任務等待執行)。
    tryTerminate方法中細節比較多,下文中再展開分析。
shutdownNow方法

shutdownNow方法同樣用於關閉線程池,但比shutdown方法更加激進。shutdownNow方法令線程池從RUNNING狀態轉變為STOP狀態,不再接收新任務,而工作隊列中未完成的任務會以列表的形式返回給shutdownNow的調用者。

  • shutdown方法在調用後,雖然不再接受新任務,但會等待工作隊列中的隊列被慢慢消費掉;而shutdownNow並不會等待,而是將當前工作隊列中的所有未被撈取執行的剩餘任務全部返回給shutdownNow的調用者,並對所有的工作線程(包括非idle的線程)發出中斷通知。
  • 這樣做的好處是線程池可以更快的進入終止態,而不必等剩餘的任務都完成,都返回給用戶後也不會丟任務。
    /**
     * 立即關閉線程池(不再接收新任務,工作隊列中未完成的任務會以列表的形式返回)
     * @return 當前工作隊列中未完成的任務
     * */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;

        final ReentrantLock mainLock = this.mainLock;

        // shutdown操作中涉及大量的資源訪問和更新,直接通過互斥鎖防併發
        mainLock.lock();
        try {
            // 用於shutdown/shutdownNow時的安全訪問許可權
            checkShutdownAccess();
            // 將線程池狀態從RUNNING推進到STOP
            advanceRunState(STOP);
            interruptWorkers();

            // 將工作隊列中未完成的任務提取出來(會清空線程池的workQueue)
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }

        // 嘗試終止線程池
        tryTerminate();
        return tasks;
    }

   /**
    * shutdownNow方法內,立即終止線程池時該方法被調用
    * 中斷通知所有已經啟動的工作線程(比如等待在工作隊列上的idle工作線程,或者run方法內部await、sleep等,令其拋出中斷異常快速結束)
    * */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (MyWorker w : workers) {
              // 遍歷所有的worker線程,已啟動的工作線程全部調用Thread.interrupt方法,發出中斷信號
              w.interruptIfStarted();
            }
        } finally {
            mainLock.unlock();
        }
    }

   /**
    * 將工作隊列中的任務全部轉移出來
    * 用於shutdownNow緊急關閉線程池時將未完成的任務返回給調用者,避免任務丟失
    * */
   private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> queue = this.workQueue;
        ArrayList<Runnable> taskList = new ArrayList<>();
        queue.drainTo(taskList);
        // 通常情況下,普通的阻塞隊列的drainTo方法可以一次性的把所有元素都轉移到taskList中
        // 但jdk的DelayedQueue或者一些自定義的阻塞隊列,drainTo方法無法轉移所有的元素
        // (比如DelayedQueue的drainTo方法只能轉移已經不需要延遲的元素,即getDelay()<=0)
        if (!queue.isEmpty()) {
           // 所以在這裡打一個補丁邏輯:如果drainTo方法執行後工作隊列依然不為空,則通過更基礎的remove方法把隊列中剩餘元素一個一個的迴圈放到taskList中
           for (Runnable r : queue.toArray(new Runnable[0])) {
              if (queue.remove(r)) {
                taskList.add(r);
              }
           }
        }
        
        return taskList;
   }    
  1. shutdownNow方法在入口處使用mainLock加鎖後,與shutdown方法一樣也通過checkShutdownAccess檢查當前是否有許可權訪問工作線程(前提是設置了SecurityManager),如果無許可權則會拋出SecurityException異常。
  2. 通過advanceRunState方法將線程池狀態推進到STOP。
  3. 通過interruptWorkers使用中斷指令(Thread.interrupt)喚醒所有工作線程(區別於shutdown中的interruptIdleWorkers)。區別在於除了idle的工作線程,所有正在執行任務的工作線程也會收到中斷通知,期望其能儘快退出任務的執行。
  4. 通過drainQueue方法將當前工作線程中剩餘的所有任務以List的形式統一返回給調用者。
  5. 通過調用tryTerminate方法嘗試終止線程池。

如何保證線程池在中止後不能再受理新的任務?

在execute方法作為入口,提交任務的邏輯中,v2版本相比v1版本新增了一些基於線程池狀態的校驗(和jdk的實現保持一致了)。

execute方法中的校驗
  • 首先在execute方法中,向工作隊列加入新任務前(workQueue.offer)對當前線程池的狀態做了一個校驗(isRunning(currentCtl))。希望非RUNNING狀態的線程池不向工作隊列中添加新任務
    但在做該檢查時可能與shutdown/shutdownNow內推進線程池狀態的邏輯併發執行,所以在工作隊列成功加入任務後還需要再檢查一次線程池狀態,如果此時已經不是RUNNING狀態則需要通過remove方法將剛入隊的任務從隊列中移除,並調用reject方法(拒絕策略)
addWorker方法中的校驗
  • 在addWorker方法的入口處(retry:第一層迴圈通過(runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())))邏輯,
    保證了不是RUNNING狀態的線程池(runState >= SHUTDOWN),無法創建新的工作線程(addWorker返回false)。
    但有一種特殊情況:即SHUTDOWN狀態下(runState == SHUTDOWN),工作隊列不為空(!workQueue.isEmpty()),且不是第一次提交任務時創建新工作線程(firstTask == null),
    依然允許創建新的工作線程,因為即使在SHUTDOWN狀態下,某一存活的工作線程發生中斷異常時,會調用processWorkerExit方法,在銷毀原有工作線程後依然需要調用addWorker重新創建一個新的(firstTask == null)
execute與shutdown/shutdownNow併發時的處理

execute提交任務時addWorker方法和shutdown/shutdownNow方法是可能併發執行的,但addWorker中有多處地方都對線程池的狀態進行了檢查,盡最大的可能避免線程池停止時繼續創建新的工作線程。

  1. retry迴圈中,compareAndIncrementWorkerCount方法會cas的更新狀態(此前獲取到的ctl狀態必然是RUNNING,否則走不到這裡),cas成功則會跳出retry:迴圈( break retry;)。
    而cas失敗可能有兩種情況:
    如果是workerCount發生了併發的變化,則在內層的for (;;)迴圈中進行重試即可
    如果線程池由於收到終止指令而推進了狀態,則隨後的if (runStateOf(currentCtl) != runState)將會為true,跳出到外層的迴圈重試(continue retry)
  2. 在new Worker(firstTask)後,使用mainLock獲取鎖後再一次檢查線程池狀態(if (runState < SHUTDOWN ||(runState == SHUTDOWN && firstTask == null)))。
    由於shutdown、shutdownNow也是通過mainLock加鎖後才推進的線程池狀態,因此這裡獲取到的狀態是準確的。
    如果校驗失敗(if結果為false),則workers中不會加入新創建的工作線程,臨時變數workerAdded=false,則工作線程不會啟動(t.start())。臨時變數workerStarted也為false,最後會調用addWorkerFailed將新創建的工作線程回收掉(回滾)

基於execute方法和addWorker方法中關於各項關於線程池停止狀態校驗,最大程度的避免了線程池在停止過程中新任務的提交和可能的新工作線程的創建。使得execute方法線上程池接收到停止指令後(>=SHUTDOWN),最終都會去執行reject拒絕策略邏輯。

/**
     * 提交任務,並執行
     * */
    @Override
    public void execute(Runnable command) {
        if (command == null){
            throw new NullPointerException("command參數不能為空");
        }

        int currentCtl = this.ctl.get();
        if (workerCountOf(currentCtl) < this.corePoolSize) {
            // 如果當前存在的worker線程數量低於指定的核心線程數量,則創建新的核心線程
            boolean addCoreWorkerSuccess = addWorker(command,true);
            if(addCoreWorkerSuccess){
                // addWorker添加成功,直接返回即可
                return;
            }

            // addWorker失敗了
            // 失敗的原因主要有以下幾個:
            // 1 線程池的狀態出現了變化,比如調用了shutdown/shutdownNow方法,不再是RUNNING狀態,停止接受新的任務
            // 2 多個線程併發的execute提交任務,導致cas失敗,重試後發現當前線程的個數已經超過了限制
            // 3 小概率是ThreadFactory線程工廠沒有正確的返回一個Thread

            // 獲取最新的ctl狀態
            currentCtl = this.ctl.get();
        }

        // 走到這裡有兩種情況
        // 1 因為核心線程超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞隊列
        // 2 addWorker返回false,創建核心工作線程失敗

        // 判斷當前線程池狀態是否為running
        // 如果是running狀態,則進一步執行任務入隊操作
        if(isRunning(currentCtl) && this.workQueue.offer(command)){
            // 線程池是running狀態,且workQueue.offer入隊成功

            int recheck = this.ctl.get();
            // 重新檢查狀態,避免在上面入隊的過程中線程池併發的關閉了
            // 如果是isRunning=false,則進一步需要通過remove操作將剛纔入隊的任務刪除,進行回滾
            if (!isRunning(recheck) && remove(command)) {
                // 線程池關閉了,執行reject操作
                reject(command);
            } else if(workerCountOf(currentCtl) == 0){
                // 在corePoolSize為0的情況下,當前不存在存活的核心線程
                // 一個任務在入隊之後,如果當前線程池中一個線程都沒有,則需要兜底的創建一個非核心線程來處理入隊的任務
                // 因此firstTask為null,目的是先讓任務先入隊後創建線程去拉取任務並執行
                addWorker(null,false);
            }else{
                // 加入隊列成功,且當前存在worker線程,成功返回
                return;
            }
        }else{
            // 阻塞隊列已滿,嘗試創建一個新的非核心線程處理
            boolean addNonCoreWorkerSuccess = addWorker(command,false);
            if(!addNonCoreWorkerSuccess){
                // 創建非核心線程失敗,執行拒絕策略(失敗的原因和前面創建核心線程addWorker的原因類似)
                reject(command);
            }else{
                // 創建非核心線程成功,成功返回
                return;
            }
        }
    }
/**
     * 向線程池中加入worker
     * */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // retry標識外層迴圈
        retry:
        for (;;) {
            int currentCtl = ctl.get();
            int runState = runStateOf(currentCtl);

            // Check if queue empty only if necessary.
            // 線程池終止時需要返回false,避免新的worker被創建
            // 1 先判斷runState >= SHUTDOWN
            // 2 runState >= SHUTDOWN時,意味著不再允許創建新的工作線程,但有一種情況例外
            // 即SHUTDOWN狀態下(runState == SHUTDOWN),工作隊列不為空(!workQueue.isEmpty()),還需要繼續執行
            // 比如在當前存活的線程發生中斷異常時,會調用processWorkerExit方法,在銷毀原有工作線程後調用addWorker重新創建一個新的(firstTask == null)
            if (runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
                // 線程池已經是關閉狀態了,不再允許創建新的工作線程,返回false
                return false;
            }

            // 用於cas更新workerCount的內層迴圈(註意這裡面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
            for (;;) {
                // 判斷當前worker數量是否超過了限制
                int workerCount = workerCountOf(currentCtl);
                if (workerCount >= CAPACITY) {
                    // 當前worker數量超過了設計上允許的最大限制
                    return false;
                }
                if (core) {
                    // 創建的是核心線程,判斷當前線程數是否已經超過了指定的核心線程數
                    if (workerCount >= this.corePoolSize) {
                        // 超過了核心線程數,創建核心worker線程失敗
                        return false;
                    }
                } else {
                    // 創建的是非核心線程,判斷當前線程數是否已經超過了指定的最大線程數
                    if (workerCount >= this.maximumPoolSize) {
                        // 超過了最大線程數,創建非核心worker線程失敗
                        return false;
                    }
                }

                // cas更新workerCount的值
                boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
                if (casSuccess) {
                    // cas成功,跳出外層迴圈
                    break retry;
                }

                // 重新檢查一下當前線程池的狀態與之前是否一致
                currentCtl = ctl.get();  // Re-read ctl
                if (runStateOf(currentCtl) != runState) {
                    // 從外層迴圈開始continue(因為說明在這期間 線程池的工作狀態出現了變化,需要重新判斷)
                    continue retry;
                }

                // compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層迴圈
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;

        MyWorker newWorker = null;
        try {
            // 創建一個新的worker
            newWorker = new MyWorker(firstTask);
            final Thread myWorkerThread = newWorker.thread;
            if (myWorkerThread != null) {
                // MyWorker初始化時內部線程創建成功

                // 加鎖,防止併發更新
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();

                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int runState = runStateOf(ctl.get());

                    // 重新檢查線程池運行狀態,滿足以下兩個條件的任意一個才創建新Worker
                    // 1 runState < SHUTDOWN
                    // 說明線程池處於RUNNING狀態正常運行,可以創建新的工作線程
                    // 2 runState == SHUTDOWN && firstTask == null
                    // 說明線程池調用了shutdown,但工作隊列不為空,依然需要新的Worker。
                    // firstTask == null標識著其不是因為外部提交新任務而創建新Worker,而是在消費SHUTDOWN前已提交的任務
                    if (runState < SHUTDOWN ||
                            (runState == SHUTDOWN && firstTask == null)) {
                        if (myWorkerThread.isAlive()) {
                            // 預檢查線程的狀態,剛初始化的worker線程必須是未喚醒的狀態
                            throw new IllegalThreadStateException();
                        }

                        // 加入worker集合
                        this.workers.add(newWorker);

                        int workerSize = workers.size();
                        if (workerSize > largestPoolSize) {
                            // 如果當前worker個數超過了之前記錄的最大存活線程數,將其更新
                            largestPoolSize = workerSize;
                        }

                        // 創建成功
                        workerAdded = true;
                    }
                } finally {
                    // 無論是否發生異常,都先將主控鎖解鎖
                    mainLock.unlock();
                }

                if (workerAdded) {
                    // 加入成功,啟動worker線程
                    myWorkerThread.start();
                    // 標識為worker線程啟動成功,並作為返回值返回
                    workerStarted = true;
                }
            }
        }finally {
            if (!workerStarted) {
                addWorkerFailed(newWorker);
            }
        }

        return workerStarted;
    }
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        // 當一個任務從工作隊列中被成功移除,可能此時工作隊列為空。嘗試判斷是否滿足線程池中止條件
        tryTerminate();
        return removed;
    }

如何保證中止過程中不丟失任務?

  1. 通過shutdown關閉線程池時,SHUTDOWN狀態的線程池會等待所有剩餘的任務執行完畢後再進入TIDYING狀態。
  2. 通過shutdownNow關閉線程池時,以返回值的形式將剩餘的任務吐出來還給用戶

中止前已提交的任務不會丟失;而中止後線程池也不會再接收新的任務(走拒絕策略)。這兩點共同保證了提交的任務不會丟失。

如何保證線程池最終關閉前,所有工作線程都已退出?

線程池在收到中止命令進入SHUTDOWN或者STOP狀態時,會一直等到工作隊列為空且所有工作線程都中止退出後才會推進到TIDYING階段。
上面描述的條件是一個複合的條件,其只有在“收到停止指令(進入SHUTDOWN或者STOP狀態)”、"工作隊列中任務被移除或消費(工作隊列為空)"或是“工作線程退出(所有工作線程都中止退出)”這三類事件發生時才有可能滿足。
而判斷是否滿足條件並推進到TIDYING狀態的關鍵就在tryTerminate方法中。tryTerminate顧名思義便是用於嘗試終止線程池的,當上述任意事件觸發時便判斷是否滿足終止條件,如果滿足則將線程池推進到TIDYING階段。
因此在ThreadPoolExecutor中tryTerminate一共在6個地方被調用,分別是shutdown、shutdownNow、remove、purge、addWorkerFailed和processWorkerExit方法。

  • shutdown、shutdownNow方法觸發收到停止指令的事件
  • remove、purge方法觸發工作隊列中任務被移除的事件
  • addWorkerFailed、processWorkerExit方法觸發工作線程退出的事件
tryTerminate源碼分析
   /**
     * 嘗試判斷是否滿足線程池中止條件,如果滿足條件,將其推進到最後的TERMINATED狀態
     * 註意:必須在任何可能觸發線程池中止的場景下調用(例如工作線程退出,或者SHUTDOWN狀態下隊列工作隊列為空等)
     * */
    final void tryTerminate() {
        for (;;) {
            int currentCtl = this.ctl.get();
            if (isRunning(currentCtl)
                    || runStateAtLeast(currentCtl, TIDYING)
                    || (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
                // 1 isRunning(currentCtl)為true,說明線程池還在運行中,不滿足中止條件
                // 2 當前線程池狀態已經大於等於TIDYING了,說明之前別的線程可能已經執行過tryTerminate,且通過了這個if校驗,不用重覆執行了
                // 3 當前線程池是SHUTDOWN狀態,但工作隊列中還有任務沒處理完,也不滿足中止條件
                // 以上三個條件任意一個滿足即直接提前return返回
                return;
            }

            // 有兩種場景會走到這裡
            // 1 執行了shutdown方法(runState狀態為SHUTDOWN),且當前工作線程已經空了
            // 2 執行了shutdownNow方法(runState狀態為STOP)
            // 這個時候需要令所有的工作線程都主動的退出來回收資源
            if (workerCountOf(currentCtl) != 0) {
                // 如果當前工作線程個數不為0,說明還有別的工作線程在工作中。
                // 通過interruptIdleWorkers(true),打斷其中的一個idle線程,嘗試令其也執行runWorker中的processWorkerExit邏輯,並執行tryTerminate
                // 被中斷的那個工作線程也會執行同樣的邏輯(getTask方法返回->processWorkerExit->tryTerminate)
                // 這樣可以一個接著一個的不斷打斷每一個工作線程,令其逐步的退出(比起一次性的通知所有的idle工作線程,這樣相對平滑很多)
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            // 線程池狀態runState為SHUTDOWN或者STOP,且存活的工作線程個數已經為0了
            // 雖然前面的interruptIdleWorkers是一個一個中斷idle線程的,但實際上有的工作線程是因為別的原因退出的(恰好workerCountOf為0了)
            // 所以這裡是可能存在併發的,因此通過mainLock加鎖防止併發,避免重覆的terminated方法調用和termination.signalAll方法調用
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // cas的設置ctl的值為TIDYING+工作線程個數0(防止與別的地方ctl併發更新)
                if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
                    try {
                        // cas成功,調用terminated鉤子函數
                        terminated();
                    } finally {
                        // 無論terminated鉤子函數是否出現異常
                        // cas的設置ctl的值為TERMINATED最終態+工作線程個數0(防止與別的地方ctl併發更新)
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 通知使用awaitTermination方法等待線程池關閉的其它線程(通過termination.await等待)
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }

            // 如果上述對ctl變數的cas操作失敗了,則進行重試,再來一次迴圈
            // else retry on failed CAS
        }
    }
如何保證工作線程一定能成功退出?

從上面tryTerminate方法的實現中可以看到,線程池必須等到所有工作線程都全部退出(workerCount為0),工作線程占用的全部資源都回收後才會推進到終止態。
那麼之前啟動的工作線程一定能通過processWorkerExit退出並銷毀嗎?答案是不一定,這主要取決於用戶是否正確的編寫了令工作線程安全退出的任務邏輯。
因為只有能退出任務執行邏輯(runWorker方法中的task.run())的工作線程才有機會執行processWorkerExit,無法從任務中跳出(正常退出or拋異常)的工作線程將永遠無法退出,導致線程池也永遠無法推進到終態。

下麵分情況討論:

  • 任務中的邏輯是一定會執行完正常結束的(沒有無限迴圈也沒有令線程陷入阻塞態的操作)。那麼這是沒問題的
   ()->{
        // 會正常結束的
        System.out.println("hello world!");
   };
  • 任務中存在需要無限迴圈的邏輯。那麼最好在迴圈條件內監聽一個volatile的變數,當需要線程池停止時,修改這個變數,從而令任務從無限迴圈中正常退出。
    ()->{
        // 無限迴圈
        while(true){
            System.out.println("hello world!");
        }
    };
    ()->{
        // 無限迴圈時監聽一個變數
        while(!isStop) {
            System.out.println("hello world!");
        }
    };
  • 任務中存在Condition.await等會阻塞當前線程,令其無法自然退出的邏輯。
    tryTerminate中停止工作線程時會調用Worker類的interruptIfStarted方法發出中斷指令(Thread.interrupt方法),如果被阻塞的方法是響應中斷的,那麼業務代碼中不能無腦吞掉InterruptedException,而要能感知到中斷異常,在確實要關閉線程池時令任務退出(向上拋異常或正常退出)。
    而如果是不響應中斷的阻塞方法(如ReentrantLock.lock),則需要用戶自己保證這些方法最終能夠被喚醒,否則工作線程將無法正常退出而阻止線程池進入終止狀態。
    ()->{
            try {
                new ReentrantLock().newCondition().await();
            } catch (InterruptedException e) {
                // doSomething處理一些邏輯後。。。
                // 向上拋出異常
                throw new XXXException(e);
            }
        }
    ()->{
        try {
            new ReentrantLock().newCondition().await();
        } catch (InterruptedException e) {

        }
        // doSomething處理一些邏輯後。。。正常退出
    }
為什麼不線上程池終止時使用Thread.stop方法強制令工作線程停止呢?

雖然Thread.stop能夠保證線程一定會被停止,但由於停止的過程中存在很嚴重的併發安全問題而被廢棄而不推薦使用了。
具體原因可以參考官方文檔(Why is Thread.stop deprecated?):https://docs.oracle.com/javase/8/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

總結

  • 本篇博客從源碼的角度詳細分析了jdk線程池ThreadPoolExecutor關於優雅停止實現的原理。其中重點介紹了ThreadPoolExecutor是如何做到中止後不能再受理新的任務、中止時不丟失已提交任務以及關閉時不會發生線程資源的泄露等核心功能。
  • 結合之前發佈的第一篇關於ThreadPoolExecutor正常運行時接受並執行所提交任務的博客,雖然沒有100%的覆蓋ThreadPoolExecutor的全部功能,但依然完整的講解了ThreadPoolExecutor最核心的功能。希望這兩篇博客能幫助到對jdk線程池實現原理感興趣的讀者。
  • 本篇博客的完整代碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(ThreadPool模塊 MyThreadPoolExecutorV2) 內容如有錯誤,還請多多指教。

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 11月15日,HMS Core手語服務在2022(第二十一屆)中國互聯網大會 “互聯網助力經濟社會數字化轉型”案例評選活動中,榮獲“特別推薦案例”。 經過一年多的技術迭代和經驗積累,HMS Core手語服務已與多個行業的開發者合作,將AI手語翻譯能力應用在了教育、社交、新聞、政務辦理等場景,助力開發 ...
  • 項目同步git:https://gitee.com/lixin_ajax/vue3-vite-ts-pinia-vant-less.git 覺得有幫助的小伙伴請點下小心心哦 為避免贅述,過於基礎的點會直接省略或貼圖,比如創建文件夾/文件的路徑/路由一類 配置相應功能,也儘量只貼相關代碼,並不代表整個 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 視頻通話SDK用的即構的,uniapp插件市場地址 推送用的極光的,uniapp插件市場地址 即構音視頻SDK uniapp插件市場的貌似是有些問題,導入不進項目,直接去官網下載,然後放到項目下的 nativeplugins 目錄下,在配 ...
  • 在工作流頁面中,除了特定的業務表單信息外,往往也需要同時展示通用申請單的相關信息,因此在頁面設計的時候需要使用一些組件化的概念來實現動態的內容展示處理,本篇隨筆介紹Vue3+TypeScript+ElementPus的前端工作流模塊中實現統一的表單編輯和表單詳情查看處理。 ...
  • 前言: 昨天我們學習了 TS 的數據類型,不知道大家回去以後練習沒練習,如果你練習了一定會發現一個問題,我們的 TS 好像和 JS 不太一樣 JS 寫完之後直接就可以放到頁面上,就可以用了,而我們的 TS 需要用 tsc 編譯一下,編譯為 JS 才能在頁面中使用 這時就會有同學說了,誒呀,六扇老師, ...
  • 限流,通常講就是限制流量,也有很多其他的說法,比如:限頻、疲勞度控制等。 原文鏈接:自定義開發限流組件 之 場景需求分析-一隻小Coder 最近遇到一個需求,系統A作為一個專門推送消息給客戶的消息中心系統,對於每個客戶是否能接受消息,能接受多少消息,接收消息的速度,能接受哪些消息等都要進行控制,這也 ...
  • 5.4 介面開發-根據id刪除附件 第2-1-2章 傳統方式安裝FastDFS-附FastDFS常用命令 第2-1-3章 docker-compose安裝FastDFS,實現文件存儲服務 第2-1-5章 docker安裝MinIO實現文件存儲服務-springboot整合minio-minio全網最 ...
  • 自己的客服系統做好了,官網頁面也有了,但是沒有介紹性的內容文章。網站被收錄的太少,這樣會導致網站的權重不高,搜索排名比較低。 因此要簡單的加上一個小型的內容管理功能。 設計資料庫 很簡單的兩張表,分類表和內容表 DROP TABLE IF EXISTS `cms_cate`; CREATE TABL ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...