【Java併發編程】21、線程池ThreadPoolExecutor源碼解析

来源:https://www.cnblogs.com/wangzhongqiu/archive/2018/03/11/8546724.html
-Advertisement-
Play Games

一、前言 JUC這部分還有線程池這一塊沒有分析,需要抓緊時間分析,下麵開始ThreadPoolExecutor,其是線程池的基礎,分析完了這個類會簡化之後的分析,線程池可以解決兩個不同問題:由於減少了每個任務調用的開銷,它們通常可以在執行大量非同步任務時提供增強的性能,並且還可以提供綁定和管理資源(包 ...


一、前言

  JUC這部分還有線程池這一塊沒有分析,需要抓緊時間分析,下麵開始ThreadPoolExecutor,其是線程池的基礎,分析完了這個類會簡化之後的分析,線程池可以解決兩個不同問題:由於減少了每個任務調用的開銷,它們通常可以在執行大量非同步任務時提供增強的性能,並且還可以提供綁定和管理資源(包括執行任務集時使用的線程)的方法。下麵開始分析。

二、ThreadPoolExecutor數據結構

   在ThreadPoolExecutor的內部,主要由BlockingQueue和AbstractQueuedSynchronizer對其提供支持,BlockingQueue介面有多種數據結構的實現,如LinkedBlockingQueueArrayBlockingQueue等,而AbstractQueuedSynchronizer在之前有過詳細的分析,有興趣的讀者可以參考。

三、ThreadPoolExecutor源碼分析

  3.1 類的繼承關係

public class ThreadPoolExecutor extends AbstractExecutorService {}

  說明:ThreadPoolExecutor繼承自AbstractExecutorService,AbstractExecuetorService提供了ExecutorService執行方法的預設實現。

  3.2 類的內部類

  ThreadPoolExecutor的核心內部類為Worker,其對資源進行了復用,減少創建線程的開銷,還有若幹個策略類。內部類的類圖如下

  說明:可以看到Worker繼承了AQS抽象類並且實現了Runnable介面,其是ThreadPoolExecutor的核心內部類。而對於AbortPolicy,用於被拒絕任務的處理程式,它將拋出 RejectedExecutionException、CallerRunsPolicy,用於被拒絕任務的處理程式,它直接在 execute 方法的調用線程中運行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務、DiscardPolicy,用於被拒絕任務的處理程式,預設情況下它將丟棄被拒絕的任務、DiscardOldestPolicy,用於被拒絕任務的處理程式,它放棄最舊的未處理請求,然後重試 execute;如果執行程式已關閉,則會丟棄該任務。這些都是拒絕任務提交時的所採用的不同策略。

  ① Worker類

  1. 類的繼承關係  

private final class Worker 
    extends AbstractQueuedSynchronizer 
    implements Runnable {}

   說明:Worker繼承了AQS抽象類,其重寫了AQS的一些方法,並且其也可作為一個Runnable對象,從而可以創建線程Thread。

  2. 類的屬性

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        // 版本號
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        // worker 所對應的線程
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        // worker所對應的第一個任務
        Runnable firstTask;
        /** Per-thread task counter */
        // 已完成任務數量
        volatile long completedTasks;
    }

  說明:Worker屬性中比較重要的屬性如下,Thread類型的thread屬性,用來封裝worker(因為worker為Runnable對象),表示一個線程;Runnable類型的firstTask,其表示該worker所包含的Runnable對象,即用戶自定義的Runnable對象,完成用戶自定義的邏輯的Runnable對象;volatile修飾的long類型的completedTasks,表示已完成的任務數量。

  3. 類的構造函數

Worker(Runnable firstTask) {
            // 設置狀態為-1
            setState(-1); // inhibit interrupts until runWorker
            // 初始化第一個任務
            this.firstTask = firstTask;
            // 根據當前worker,初始化線程
            this.thread = getThreadFactory().newThread(this);
        }

  說明:用於構造一個worker對象,並設置AQS的state為-1,同時初始化了對應的域。

  4. 核心函數分析

// 重寫了Runnable的run方法
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        // 是否被獨占,0代表未被獨占,1代表被獨占
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        // 嘗試獲取
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) { // 比較並設置狀態成功
                // 設置獨占線程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 嘗試釋放
        protected boolean tryRelease(int unused) {
            // 設置獨占線程為null
            setExclusiveOwnerThread(null);
            // 設置狀態為0
            setState(0);
            return true;
        }
        // 獲取鎖
        public void lock()        { acquire(1); }
        // 嘗試獲取鎖
        public boolean tryLock()  { return tryAcquire(1); }
        // 釋放鎖
        public void unlock()      { release(1); }
        // 是否被獨占
        public boolean isLocked() { return isHeldExclusively(); }
        // 
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // AQS狀態大於等於0並且worker對應的線程不為null並且該線程沒有被中斷
                try {
                    // 中斷線程
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

  說明:Worker的函數主要是重寫了AQS的相應函數和重寫了Runnable的run函數,重寫的函數比較簡單,具體的可以參見AQS的分析,這裡不再累贅。

  3.3 類的屬性  

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 線程池的控制狀態(用來表示線程池的運行狀態(整形的高3位)和運行的worker數量(低29位))
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29位的偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大容量(2^29 - 1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 線程運行狀態,總共有5個狀態,需要3位來表示(所以偏移量的29 = 32 - 3)
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 阻塞隊列
    private final BlockingQueue<Runnable> workQueue;
    // 可重入鎖
    private final ReentrantLock mainLock = new ReentrantLock();
    // 存放工作線程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 終止條件
    private final Condition termination = mainLock.newCondition();
    // 最大線程池容量
    private int largestPoolSize;
    // 已完成任務數量
    private long completedTaskCount;
    // 線程工廠
    private volatile ThreadFactory threadFactory;
    // 拒絕執行處理器
    private volatile RejectedExecutionHandler handler;
    // 線程等待運行時間
    private volatile long keepAliveTime;
    // 是否運行核心線程超時
    private volatile boolean allowCoreThreadTimeOut;
    // 核心池的大小
    private volatile int corePoolSize;
    // 最大線程池大小
    private volatile int maximumPoolSize;
    // 預設拒絕執行處理器
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    //
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
}

  說明:這裡著重講解一下AtomicInteger類型的ctl屬性,ctl為線程池的控制狀態,用來表示線程池的運行狀態(整形的高3位)和運行的worker數量(低29位)),其中,線程池的運行狀態有如下幾種

複製代碼
    /**
    * RUNNING    :    接受新任務並且處理已經進入阻塞隊列的任務
    * SHUTDOWN    :    不接受新任務,但是處理已經進入阻塞隊列的任務
    * STOP        :    不接受新任務,不處理已經進入阻塞隊列的任務並且中斷正在運行的任務
    * TIDYING    :    所有的任務都已經終止,workerCount為0, 線程轉化為TIDYING狀態並且調用terminated鉤子函數
    * TERMINATED:    terminated鉤子函數已經運行完成
    **/
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

  說明:由於有5種狀態,最少需要3位表示,所以採用的AtomicInteger的高3位來表示,低29位用來表示worker的數量,即最多表示2^29 - 1。

  3.4 類的構造函數

  1. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>)型構造函數

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

  說明:該構造函數用給定的初始參數和預設的線程工廠及被拒絕的執行處理程式創建新的 ThreadPoolExecutor。

   2. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)型構造函數  

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

  說明:該構造函數用給定的初始參數和預設被拒絕的執行處理程式創建新的 ThreadPoolExecutor

   3. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)型構造函數

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

  說明:該構造函數用給定的初始參數和預設的線程工廠創建新的 ThreadPoolExecutor

   4. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory, RejectedExecutionHandler)型構造函數

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||                                                // 核心大小不能小於0
            maximumPoolSize <= 0 ||                                            // 線程池的初始最大容量不能小於0
            maximumPoolSize < corePoolSize ||                                // 初始最大容量不能小於核心大小
            keepAliveTime < 0)                                                // keepAliveTime不能小於0
            throw new IllegalArgumentException();                                
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 初始化相應的域
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

  說明:該構造函數用給定的初始參數創建新的 ThreadPoolExecutor,其他的構造函數都會調用到此構造函數。

  3.5 核心函數分析

  1. execute函數  

public void execute(Runnable command) {
        if (command == null) // 命令為null,拋出異常
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        /*
        * 進行下麵三步
        *
        * 1. 如果運行的線程小於corePoolSize,則嘗試使用用戶定義的Runnalbe對象創建一個新的線程
        *     調用addWorker函數會原子性的檢查runState和workCount,通過返回false來防止在不應
        *     該添加線程時添加了線程
        * 2. 如果一個任務能夠成功入隊列,在添加一個線城時仍需要進行雙重檢查(因為在前一次檢查後
        *     該線程死亡了),或者當進入到此方法時,線程池已經shutdown了,所以需要再次檢查狀態,
        *    若有必要,當停止時還需要回滾入隊列操作,或者當線程池沒有線程時需要創建一個新線程
        * 3. 如果無法入隊列,那麼需要增加一個新線程,如果此操作失敗,那麼就意味著線程池已經shut
        *     down或者已經飽和了,所以拒絕任務
        */
        // 獲取線程池控制狀態
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // worker數量小於corePoolSize
            if (addWorker(command, true)) // 添加worker
                // 成功則返回
                return;
            // 不成功則再次獲取線程池控制狀態
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 線程池處於RUNNING狀態,將命令(用戶自定義的Runnable對象)添加進workQueue隊列
            // 再次檢查,獲取線程池控制狀態
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) // 線程池不處於RUNNING狀態,將命令從workQueue隊列中移除
                // 拒絕執行命令
                reject(command);
            else if (workerCountOf(recheck) == 0) // worker數量等於0
                // 添加worker
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 添加worker失敗
            // 拒絕執行命令
            reject(command);
    }

   當在客戶端調用submit時,之後會間接調用到execute函數,其在將來某個時間執行給定任務,此方法中並不會直接運行給定的任務。此方法中主要會調用到addWorker函數。

addWorker函數說明:第一個參數firstTask不為null,則創建的線程就會先執行firstTask對象,然後去阻塞隊列中取任務,否直接到阻塞隊列中獲取任務來執行。第二個參數,core參數為真,則用corePoolSize作為池中線程數量的最大值;為假,則以maximumPoolSize作為池中線程數量的最大值。

addWorker函數源碼: 

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) { // 外層無限迴圈
            // 獲取線程池控制狀態
            int c = ctl.get();
            // 獲取狀態
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&            // 狀態大於等於SHUTDOWN,初始的ctl為RUNNING,小於SHUTDOWN
                ! (rs == SHUTDOWN &&        // 狀態為SHUTDOWN
                   firstTask == null &&        // 第一個任務為null
                   ! workQueue.isEmpty()))     // worker隊列不為空
                // 返回
                return false;

            for (;;) {
                // worker數量
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||                                // worker數量大於等於最大容量
                    wc >= (core ? corePoolSize : maximumPoolSize))    // worker數量大於等於核心線程池大小或者最大線程池大小
                    return false;
                if (compareAndIncrementWorkerCount(c))                 // 比較並增加worker的數量
                    // 跳出外層迴圈
                    break retry;
                // 獲取線程池控制狀態
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) // 此次的狀態與上次獲取的狀態不相同
                    // 跳過剩餘部分,繼續迴圈
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // worker開始標識
        boolean workerStarted = false;
        // worker被添加標識
        boolean workerAdded = false;
        // 
        Worker w = null;
        try {
            // 初始化worker
            w = new Worker(firstTask);
            // 獲取worker對應的線程
            final Thread t = w.thread;
            if (t != null) { // 線程不為null
                // 線程池鎖
                final ReentrantLock mainLock = this.mainLock;
                // 獲取鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 線程池的運行狀態
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||                                    // 小於SHUTDOWN
                        (rs == SHUTDOWN && firstTask == null)) {            // 等於SHUTDOWN並且firstTask為null
                        if (t.isAlive()) // precheck that t is startable    // 線程剛添加進來,還未啟動就存活
                            // 拋出線程狀態異常
                            throw new IllegalThreadStateException();
                        // 將worker添加到worker集合
                        workers.add(w);
                        // 獲取worker集合的大小
                        int s = workers.size();
                        if (s > largestPoolSize) // 隊列大小大於largestPoolSize
                            // 重新設置largestPoolSize
                            largestPoolSize = s;
                        // 設置worker已被添加標識
                        workerAdded = true;
                    }
                } finally {
                    // 釋放鎖
                    mainLock.unlock();
                }
                if (workerAdded) { // worker被添加
                    // 開始執行worker的run方法
                    t.start();
                    // 設置worker已開始標識
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted) // worker沒有開始
                // 添加worker失敗
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  說明:此函數可能會完成如下幾件任務

  ① 通過spin和cas增加workcount

  ② 生成封裝新thread和firsttask的work,worker繼承了aqs,通過mainLock加鎖線程安全的將此worker添加進workers集合中,更新largestPoolSize。

  ③ 啟動worker對應的線程,運行runWorker方法。

  ④ 如果worker創建失敗,回滾,將worker從workers集合中刪除,並原子性的減少workerCount。

  2. runWorker函數  

final void runWorker(Worker w) {
        // 獲取當前線程
        Thread wt = Thread.currentThread();
        // 獲取w的firstTask
        Runnable task = w.firstTask;
        // 設置w的firstTask為null
        w.firstTask = null;
        // 釋放鎖(設置state為0,允許中斷)
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { // 任務不為null或者阻塞隊列還存在任務
                // 獲取鎖
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||    // 線程池的運行狀態至少應該高於STOP
                     (Thread.interrupted() &&                // 線程被中斷
                      runStateAtLeast(ctl.get(), STOP))) &&    // 再次檢查,線程池的運行狀態至少應該高於STOP
                    !wt.isInterrupted())                    // wt線程(當前線程)沒有被中斷
                    wt.interrupt();                            // 中斷wt線程(當前線程)
                try {
                    // 在執行之前調用鉤子函數
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 運行給定的任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 執行完後調用鉤子函數
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 增加給worker完成的任務數量
                    w.completedTasks++;
                    // 釋放鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 處理完成後,調用鉤子函數
            processWorkerExit(w, completedAbruptly);
        }
    }

  說明:此函數中會實際執行給定任務(即調用用戶重寫的run方法),並且當給定任務完成後,會繼續從阻塞隊列中取任務,直到阻塞隊列為空(即任務全部完成)。在執行給定任務時,會調用鉤子函數,利用鉤子函數可以完成用戶自定義的一些邏輯。在runWorker中會調用到getTask函數和processWorkerExit鉤子函數,其中,getTask函數源碼如下  

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) { // 無限迴圈,確保操作成功
            // 獲取線程池控制狀態
            int c = ctl.get();
            // 運行的狀態
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 大於等於SHUTDOWN(表示調用了shutDown)並且(大於等於STOP(調用了shutDownNow)或者worker阻塞隊列為空)
                // 減少worker的數量
                decrementWorkerCount();
                // 返回null,不執行任務
                return null;
            }
            // 獲取worker數量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允許coreThread超時或者workerCount大於核心大小

            if ((wc > maximumPoolSize || (timed && timedOut))     // worker數量大於maximumPoolSize
                && (wc > 1 || workQueue.isEmpty())) {            // workerCount大於1或者worker阻塞隊列為空(在阻塞隊列不為空時,需要保證至少有一個wc)
                if (compareAndDecrementWorkerCount(c))            // 比較並減少workerCount
                    // 返回null,不執行任務,該worker會退出
                    return null;
                // 跳過剩餘部分,繼續迴圈
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    // 等待指定時間
                    workQueue.take();                                        // 一直等待,直到有元素
                if (r != null)
                    return r;
                // 等待指定時間後,沒有獲取元素,則超時
                timedOut = true;
            } catch (InterruptedException retry) {
                // 拋出了被中斷異常,重試,沒有超時
                timedOut = false;
            }
        }
    }

  說明:此函數用於從workerQueue阻塞隊列中獲取Runnable對象,由於是阻塞隊列,所以支持有限時間等待(poll)和無限時間等待(take)。在該函數中還會響應shutDown和、shutDownNow函數的操作,若檢測到線程池處於SHUTDOWN或STOP狀態,則會返回null,而不再返回阻塞隊列中的Runnalbe對象。

  processWorkerExit函數是在worker退出時調用到的鉤子函數,而引起worker退出的主要因素如下

  ① 阻塞隊列已經為空,即沒有任務可以運行了。

  ② 調用了shutDown或shutDownNow函數

  processWorkerExit的源碼如下  

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果被中斷,則需要減少workCount    // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        // 獲取可重入鎖
        final ReentrantLock mainLock = this.mainLock;
        // 獲取鎖
        mainLock.lock();
        try {
            // 將worker完成的任務添加到總的完成任務中
            completedTaskCount += w.completedTasks;
            // 從workers集合中移除該worker
            workers.remove(w);
        } finally {
            // 釋放鎖
            mainLock.unlock();
        }
        // 嘗試終止
        tryTerminate();
        // 獲取線程池控制狀態
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { // 小於STOP的運行狀態
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty()) // 允許核心超時並且workQueue阻塞隊列不為空
                    min = 1;
                if (workerCountOf(c) >= min) // workerCount大於等於min
                    // 直接返回
                    return; // replacement not needed
            }
            // 添加worker
            addWorker(null, false);
        }
    }

  說明:此函數會根據是否中斷了空閑線程來確定是否減少workerCount的值,並且將worker從workers集合中移除並且會嘗試終止線程池。

  3. shutdown函數

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 檢查shutdown許可權
            checkShutdownAccess();
            // 設置線程池控制狀態為SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中斷空閑worker
            interruptIdleWorkers();
            // 調用shutdown鉤子函數
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 嘗試終止
        tryTerminate();
    }

  說明:此函數會按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務。首先會檢查是否具有shutdown的許可權,然後設置線程池的控制狀態為SHUTDOWN,之後中斷空閑的worker,最後嘗試終止線程池。
嘗試終止線程池tryTerminate的源碼如下

final void tryTerminate() {
        for (;;) { // 無限迴圈,確保操作成功
            // 獲取線程池控制狀態
            int c = ctl.get();
            if (isRunning(c) ||                                            // 線程池的運行狀態為RUNNING
                runStateAtLeast(c, TIDYING) ||                            // 線程池的運行狀態最小要大於TIDYING
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))    // 線程池的運行狀態為SHUTDOWN並且workQueue隊列不為null
                // 不能終止,直接返回
                return;
            if (workerCountOf(c) != 0) { // 線程池正在運行的worker數量不為0    // Eligible to terminate
                // 僅僅中斷一個空閑的worker
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            // 獲取線程池的鎖
            final ReentrantLock mainLock = this.mainLock;
            // 獲取鎖
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比較並設置線程池控制狀態為TIDYING
                    try {
                        // 終止,鉤子函數
                        termina

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 關於JWinner JWinner是一個JAVA項目的快速開發框架,他已經實現了大多數項目開發之前需要進行的一些必備工作,還有很多在開發過程中可能會用到的工具集。 JWinner的誕生並不是一蹴而就的,前身經歷了多個框架的摸索,在不同階段暴露出來的靈感和需求,都促使JWinner越來越靠近我心目中的 ...
  • 前段時間做項目需要讀取一個文件夾裡面所有的txt文件,查詢資料後得到以下實現方法:首先瞭解一下這個結構體struct _finddata_t { unsigned attrib; time_t time_create; time_t time_access; time_t time_write; _ ...
  • 索引和切片 索引 是從0開始計數;當索引值為負數時,表示從最後一個元素(從右到左)開始計數 切片 用於截取某個範圍內的元素,通過:來指定起始區間(左閉右開區間,包含左側索引值對應的元素,但不包含右測索引值對應的元素。 示例 hello world 字元串是一種序列,序列基本都有下麵這些操作 len( ...
  • 參考書 : <<振動分析>> 張準 汪鳳泉 編著 東南大學出版社 ISBN 7-80123-583-4 參考章節 : 4.6.2 和 4.6.3 <<數值分析>> 崔瑞彩 謝偉松 天津大學出版社 ISBN 7-5618-1366-X 參考章節 : 3.1 參考資料: <<交替使用冪法和降階法求解矩陣 ...
  • 實現思路 將所需要的數字存入一個列表中 看下圖你就明白了: 實現代碼 快速排序比較冒泡排序效率要高得多~ ...
  • 字元串 字元串或串(String)是由數字、字母、下劃線組成的一串字元,用雙引號或單引號包裹的為字元串 下麵示例: 語法錯誤 第一行出現三個單引號,Python 解析器匹配不上成對的引號,所以報錯。 解決方法:1、可使用雙引號包裹 2、可以使用反斜杠\ 轉義字元 字元串 、數字互轉 內置函數int( ...
  • Python機器學習介紹(Python Machine Learning 中文版) 機器學習,如今最令人振奮的電腦領域之一。看看那些大公司,Google、Facebook、Apple、Amazon早已展開了一場關於機器學習的軍備競賽。從手機上的語音助手、垃圾郵件過濾到逛淘寶時的物品推薦,無一不用... ...
  • Python機器學習 機器學習,如今最令人振奮的電腦領域之一。看看那些大公司,Google、Facebook、Apple、Amazon早已展開了一場關於機器學習的軍備競賽。從手機上的語音助手、垃圾郵件過濾到逛淘寶時的物品推薦,無一不用到機器學習技術。 如果你對機器學習感興趣,甚至是想從事相關職業... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...