【Java 併發】詳解 ThreadPoolExecutor

来源:http://www.cnblogs.com/zhangjk1993/archive/2017/04/22/6749310.html
-Advertisement-
Play Games

前言 線程池是併發中一項常用的優化方法,通過對線程復用,減少線程的創建,降低資源消耗,提高程式響應速度。在 Java 中我們一般通過 Exectuors 提供的工廠方法來創建線程池,但是線程池的最終實現類是 ThreadPoolExecutor,下麵我們詳細分析一下 ThreadPoolExecut ...


前言

線程池是併發中一項常用的優化方法,通過對線程復用,減少線程的創建,降低資源消耗,提高程式響應速度。在 Java 中我們一般通過 Exectuors 提供的工廠方法來創建線程池,但是線程池的最終實現類是 ThreadPoolExecutor,下麵我們詳細分析一下 ThreadPoolExecutor 的實現。

基本使用

我們首先看下線程池的基本使用。在下麵的代碼中我們創建一個固定大小的線程池,該線程池中最多包含 5 個線程,當任務數量超過線程的數量時,就將任務添加到任務隊列,等線程空閑之後再從任務隊列中獲取任務。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by Jikai Zhang on 2017/4/17.
 */
public class ThreadPoolDemo {

    static class WorkThread implements Runnable {
        private String command;

        public WorkThread(String command) {
            this.command = command;
        }

        @Override
        public void run() {
            System.out.println("Thread-" + Thread.currentThread().getId() + " start. Command=" + command);
            processCommand();
            System.out.println("Thread-" + Thread.currentThread().getId() + " end.");
        }

        private void processCommand() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {

            Runnable work = new WorkThread("" + i);
            executor.execute(work);
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finish all threads.");
    }
}

概述

在分析線程池的具體實現之前,我們首先看下線程池具體的工作流程,只有先熟悉了流程,才能更好的理解線程池的實現。線程池一般都會關聯一個任務隊列,用來緩存任務,當線程執行完一個任務之後,會從任務隊列中取下一個任務。ThreadPoolExecutor 中使用阻塞隊列作為任務隊列,當任務隊列為空時,就會阻塞請求任務的線程。下麵是 ThreadPoolExecutor 整體的圖示:

圖片來自 Java 併發編程的藝術

下麵我們著重看下 ThreadPoolExecutor 添加任務和關閉線程池的流程。下圖是 ThreadPoolExecutor 添加任務的流程:

我們首先看下添加任務的具體流程:

  • 如果線程池中的線程數量少於 corePoolSize,那麼直接創建一個新的線程(不論線程池中是否有空閑線程),然後把該任務分配給新建線程,同時將線程加入到線程池中。
  • 如果線程池的線程數量大於等於 corePoolSize,就將任務添加到任務隊列
  • 如果任務隊列已經飽和(對於有邊界的任務隊列),那麼就看下線程池中的線程數量是否少於 maximumPoolSize,如果少於,就創建新的線程,將當前任務分配給新線程,同時將線程加入到線程池中。否則就對該任務執行 reject 策略。

在 ThreadPoolExecutor 中通過兩個量來控制線程池的大小:corePoolSize 和 maximumPoolSize。corePoolSize 表示正常狀態下線程池中應該持有的存活線程數量,maximumPoolSize 表示線程池可以持有的最大線程數量。當線程池中的線程數量不超過 corePoolSize 時,位於線程池中的線程被看作 core 線程,預設情況下,線程池不對 core 線程進行超時控制,也就是 core 線程會一直存活線上程池中,直到線程池被關閉(這裡忽略線程異常關閉的情況)。當線程池中的線程數量超過 corePoolSize 時,額外的線程被看作非 core 線程,線程池會對這部分線程進行超時控制,當線程空閑一段時間之後會銷毀該線程。非 core 線程主要用來處理某段時間併發任務特別多的情況,即之前的線程配置無法及時處理那麼多的任務量,需要額外的線程來幫助。而當這批任務處理完成之後,額外的線程就有些多餘了(線程越多占的資源越多),因此需要及時銷毀。

ThreadPoolExecutor 定義線程數量上限是 2^29 - 1 = 536870911(後面會講到為什麼是這個數),同時用戶可以自定義最大線程數量,ThreadPoolExecutor 處理時會選兩者之間的較小值。當線程池的線程數量等於 maximumPoolSize 時,說明線程池也已經飽和了,此時對於新來的任務就要執行 reject 策略,JDK 中定義了四種拒絕策略:

  • AbortPolicy:直接拋出異常,預設策略
  • CallerRunsPolicy:使用調用者所在的線程執行任務
  • DiscardOldestPolicy:丟棄當前任務隊列中最前面的任務,並執行 execute 方法添加新任務
  • DiscardPolicy:直接丟棄任務

下麵再看一下線程池的關閉。線程池的關閉分為兩種:平緩關閉(shutdown)和立即關閉(shutdownNow)。當調用 shutdown 方法之後,線程池不再接受新的任務,但是仍然會將任務隊列中已有的任務執行完畢。而調用 shutdownNow 方法之後,線程池不僅不再接受新的任務,也不會再執行任務隊列中剩餘的任務,同時會通過中斷的方式嘗試停止正在執行任務的線程(我們知道對於中斷,線程可能響應也可能不響應,所以不能保證一定停止線程)。

具體實現

下麵我們從源碼的角度分析一下 ThreadPoolExecutor 的實現。

Worker

ThreadPoolExecutor 中每個線程都關聯一個 Worker 對象,而 ThreadPool 里實際上保存的就是線程關聯的 Worker 對象。 Worker 類對線程進行包裝,它除了保存關聯線程的信息,還保存一些其他的信息,如線程創建時分配的首任務,線程已完成的任務數量。Worker 實現了 Runnable 介面,創建線程時往 Thread 類傳的參數就是該對象,所以線程創建後會執行 Worker 的 run 方法。同時 Worker 類還繼承了 AbstractQueuedSynchronizer,使自身成為一個不可重入的互斥鎖(以下稱為 Worker 鎖,註意 Worker 鎖是不可重入的,也就是說該鎖只能被一個線程獲取一次),因此每個線程實際上也關聯了一個互斥鎖。當線程執行任務時,需要首先獲得關聯的 Worker 鎖,執行完任務之後再釋放該鎖。Worker 鎖的主要作用是為了平緩關閉線程池時,判斷線程是否空閑(根據能否獲得 Worker 鎖),後續會詳細講解。下麵是 Worker 類的實現,我們只保留了一些必要的內容:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    // 當前 Worker 對象關聯的線程
    final Thread thread;
    // 線程創建後的初始任務
    Runnable firstTask;
    // 線程完成的任務數量
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        // 只有 state 為 0,線程才能獲取到 Worker 鎖,這裡將 state 設為 -1,
        // 表明任何線程都無法獲取鎖,在 shutdown 方法中,如果要中斷線程,需要首先獲得線程
        // 關聯的 Worker 鎖,而 shutdownNow 中斷線程之前,會首先判斷 state 是否大於等於 0
        // 所以這裡將 state 設為 -1,可以防止當前線程被中斷
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 創建線程時將自身傳入
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    // 線程創建之後會運行該方法
    public void run() {
        runWorker(this);
    }

    // 只要線程啟動了,就中斷線程,用於 shutdownNow 方法
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {}
        }
    }
}

我們看到在 Worker 的構造函數中將 state 設為了 -1,註釋里給出的解釋是:禁止中斷直到執行了 runWorker 方法。其實這裡包含了兩個問題:1.為什麼要等到執行了 runWorker 方法 2.怎樣禁止中斷。對於第一個問題,我們知道中斷是針對運行的線程,當線程創建之後只有調用了 start 方法,線程才真正運行,而 start 方法的調用是在 runWorker 方法中的,也就是有隻有執行了 runWorker 方法,線程才真正啟動。對於第二個問題,這個主要是針對 shutdown 和 shutdownNow 方法的。在 shutdown 方法中,中斷線程之前會首先嘗試獲取線程的 Worker 鎖,只有獲得了 Worker 鎖才對線程進行中斷。而獲得 Worker 鎖的前提是 Worker 的鎖的狀態變數 state 為 0,當 state 設為 -1 之後,任何線程都無法獲得該鎖,那麼也就無法對線程執行中斷操作。而在 shutdownNow 方法中,會調用 Worker 的 interruptIfStarted 方法來中斷線程,而 interruptIfStarted 方法只有在 state >= 0 時才會中斷線程,所以將 state 設為 -1 可以防止線程被提前中斷。當執行 runWorker 方法時,會為傳入 Worker 對象執行 unlock 操作(也就是將 state 加 1),使 Worker 對象的 state 變為 0,這樣就使線程處於可被中斷的狀態了。

狀態變數

在 ThreadPoolExecutor 中定義了一個 AtomicInteger 類型的變數 ctl,用來保存線程池的狀態和線程數量信息。下麵是該變數的定義:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 使用低 29 位保存線程的數量(這就是線程池最大線程數量為 2^29-1 的原因),高 3 位保存線程池的狀態。為了提取出這兩個信息,ThreadPoolExecutor 定義了一個低 29 位全為 1 的變數 CAPACITY,通過和 CAPACITY 進行 & 運算可以獲得線程的數量,通過和 ~CAPACITY 進行 & 運算可以獲得線程池的狀態,下麵是程式中的實現:

// 存儲線程數量的 bit 位數,這裡是 29
private static final int COUNT_BITS = Integer.SIZE - 3;

// 用於提取線程池的運行狀態以及線程數量,低 29 位全為 1,高 3 位為0
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 獲得線程池的運行狀態
private static int runStateOf(int c) {
    return c & ~CAPACITY;
}

// 獲得線程的數量
private static int workerCountOf(int c) {
    return c & CAPACITY;
}

ThreadPoolExecutor 中為線程池定義了五種狀態:

  • RUNNING:正常狀態,接受新的任務,並處理任務隊列中的任務
  • SHUTDOWN:不接受新的任務,但是處理已經在任務隊列中的任務
  • STOP: 不接受新的任務,也不處理已經在任務隊列中的任務,同時會嘗試停止正在執行任務的線程
  • TIDYING: 線程池和任務隊列都為空,該狀態下線程會執行 terminated() 方法
  • TERMINATED:terminated() 方法執行完畢

下麵是 JDK 中關於這 5 個變數的定義:

// 11100000000000000000000000000000  -536870912
private static final int RUNNING = -1 << COUNT_BITS;

// 00000000000000000000000000000000  0
private static final int SHUTDOWN = 0 << COUNT_BITS;

// 00100000000000000000000000000000  536870912
private static final int STOP = 1 << COUNT_BITS;

// 01000000000000000000000000000000  1073741824
private static final int TIDYING = 2 << COUNT_BITS;

// 01100000000000000000000000000000  1610612736
private static final int TERMINATED = 3 << COUNT_BITS;

下麵是各狀態之間的轉換:

  • RUNNING -> SHUTDOWN:調用了 shutdown() 方法 (perhaps implicitly in finalize())
  • (RUNNING or SHUTDOWN) -> STOP:調用了shutdownNow() 方法
  • SHUTDOWN -> TIDYING:線程池和任務隊列都為空
  • STOP -> TIDYING:線程池為空
  • TIDYING -> TERMINATED:執行完 terminated() 方法

添加任務

通過 execute 或者 submit 方法都可以向線程池中添加一個任務,submit 會返回一個 Future 對象來獲取線程的返回值,下麵是 submit 方法的實現:

public Future <?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture <Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

我們看到 submit 中只是將 Runnable 對象包裝了一下,最終還是調用了 execute 方法。下麵我們看下 execute 方法的實現:

public void execute(Runnable command) {
    // command 不能為 null
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 線程數量少於 corePoolSize,會創建一個新的線程執行該任務
    if (workerCountOf(c) < corePoolSize) {
        // true 表示當前添加的線程為核心線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 線程數量大於等於 corePoolSize,首先嘗試將任務添加到任務隊列
    // workQueue.offer 會將任務添加到隊列尾部
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新檢查狀態
        int recheck = ctl.get();
        // 如果發現當前線程池不是處於 Running 狀態,就移除之前的任務
        // 移除任務過程有鎖保護
        if (!isRunning(recheck) && remove(command)) {
            reject(command);
        } else if (workerCountOf(recheck) == 0) {

            // workerCountOf 用來統計當前的工作線程數量,程式執行到這裡,有下麵兩種可能:
            //  1. 當前線程池處於 Running 狀態,但是工作線程數量為 0,
            //      需要創建新的線程
            //  2. 移除任務失敗,但是工作線程數量為 0,
            //      需要創建新的線程來完成移除失敗的任務
            //
            //  因為前面對任務做了判斷,所以正常情況下向 addWorker 里傳入的任務
            //  不可能為 null,這裡傳入 null 是告訴 addWorker 需要創建新的線程,
            //  在 addWorker 里對 null 有專門的處理邏輯
            addWorker(null, false);
        }
    // 下麵的 else 說明線程池不是 Running 狀態或者任務隊列滿了,
    } else if (!addWorker(command, false)) {
        // 這裡說明線程池不是 Running 狀態或者線程池飽和了
        reject(command);
    }
}

在前面我們提到了線程池添加任務的流程,這裡再重述一下

  • 如果線程池的線程數量少於 corePoolSize,則新建一個線程,執行當前任務,並將該任務加入到線程池
  • 如果線程池中的線程數量大於等於 corePoolSize,則首先將任務添加到任務隊列
  • 如果任務隊列已滿,則繼續創建線程,如果線程池達到了飽和值 maximumPoolSize,則調用 reject 策略處理該任務。

addWorker 方法會創建並啟動線程,當線程池不處於 Running 狀態並且傳入的任務不為 null,addWorker 就無法成功創建線程。下麵看下它的具體實現:

private boolean addWorker(Runnable firstTask, boolean core) {
    // retry 類似於 goto,continue retry 跳轉到 retry 定義,
    // 而 break retry 跳出 retry
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 我們在下麵詳細講解該條件
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 線程數量大於系統規定的最大線程數或者大於 corePoolSize/maximumPoolSize
            // 表明線程池中無法添加新的線程,這裡 wc >= CAPACITY 為了防止 corePoolSize
            // 或者 maximumPoolSize 大於CAPACITY
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
                return false;
            }
            // 使用 CAS 方式將線程數量增加,如果成功就跳出 retry
            if (compareAndIncrementWorkerCount(c)) {
                break retry;
            }

            c = ctl.get(); // Re-read ctl
            // 如果線程池運行狀態發生了改變就從 retry(外層迴圈)處重新開始,
            if (runStateOf(c) != rs)
                continue retry;

            // 程式執行到這裡說 CAS 沒有成功,那麼就再次執行 CAS
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 創建 work
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // t != null 說明線程創建成功了
        if (t != null) {
            // 程式用一個 HashSet 存儲線程,而 HashSet 不是線程的安全的,
            // 所以將線程加入 HashSet 的過程需要加鎖。
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 1. rs < SHUTDOWN 說明程式在運行狀態
                // 2. rs == SHUTDOWN  說明當前線程處於平緩關閉狀態,而 firstTask == null
                //    說明當前創建的線程是為了處理任務隊列中剩餘的任務(故意傳入 null)
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    // 線程是存活狀態說明線程提前開始了。
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這裡我們著重看下返回 false 的條件:

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
// 等價於
if(rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))

我們依次看下上面的條件:

  • rs >= SHUTDOWN && rs != SHUTDOWN:說明線程池處於 STOP,TIDYING 或者 TERMINATED 狀態下,處於這三種狀態說明線程池處理完了所有任務或者不再執行剩餘的任務,可以直接返回
  • rs == SHUTDOWN && firstTask != null:如果上面的條件不成立,說明當前線程池的狀態一定是處於 SHUTDOWN 狀態,在 execute 方法中,我們提到瞭如果傳入 null,說明創建線程是為了執行隊列中剩餘的任務(此時線程池中沒有工作線程),這時就不應該返回。而如果 firstTask != null,說明不是為了處理隊列中剩餘的任務,可以返回。
  • rs == SHUTDOWN && workQueue.isEmpty():說經任務隊列中的任務已經全部執行完了,無需創建新的線程,可以返回。

當創建了線程併成功啟動之後,會執行 Worker 的 run 方法,而該方法最終調用了 ThreadPoolExecutor 的 runWorker 方法,並且將自身作為參數傳進去了,下麵是 runWorker 方法的實現:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 這裡將 Worker 中的 state 設為 0,以便其他線程可以獲得鎖
    // 從而可以中斷當前線程
    w.unlock(); // allow interrupts
    // 用來標記線程是正常退出迴圈還是異常退出
    boolean completedAbruptly = true;
    try {
        // 如果任務不為空,說明是剛創建線程,如果任務為空,則從隊列中取任務
        // 如果隊列沒有任務,線程就會阻塞在這裡
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                // 任務執行之前做一些處理,空函數,需要用戶定義處理邏輯
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    // 因為 runnable 方法不能拋出 checkedException ,所以這裡
                    // 將異常包裝成 Error 拋出
                    throw new Error(x);
                } finally {
                    // 任務執行完之後做一些處理,預設空函數
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

在上面的代碼中,第一個 if 判斷的邏輯有點難理解,我們將它拿出分析一下。

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
    && !wt.isInterrupted())
    wt.interrupt();

這段 if 代碼塊的功能有兩個:

  • 如果當前線程池的狀態小於 STOP,也就是處於 RUNNING 或者 SHUTDOWN 狀態,要保證線程池中的線程處於非中斷狀態
  • 如果當前線程池的狀態大於等於 STOP,也就是處於 STOP,TIDYING 或者 TERMINATED 狀態,要保證線程池中的線程處於中斷狀態

上面的 if 代碼中括弧比較多,我們先將其分為兩個大條件:

  • runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) &&
  • !wt.isInterrupted()

我們先看第二個條件:!wt.isInterrupted(),該條件說明當前線程沒有被中斷,只有線上程沒有被中斷的前提下,才有可能對線程執行中斷操作。然後我們將第一個大條件再進行拆分,可以分為下麵兩個條件:

  • runStateAtLeast(ctl.get(), STOP) ||
  • Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)

我們先看第一個條件,該條件說明線程處於 STOP 以及之後的狀態,線程應該被中斷。如果該條件不成立,說明當前線程不應該被中斷,那麼會調用 Thread.interrupted() 方法,該方法會首返回線程的中斷狀態,然後重置線程中斷狀態(設為 false),如果中斷狀態本來就為 false,那麼就可以就可以跳出 if 代碼塊了,但是如果中斷狀態是 true,說明線程被中斷過了,此時我們就要判斷線程的中斷是不是由 shutdownNow 方法(併發調用,該方法會中斷線程池的線程,並修改線程池狀態為 STOP,後面會講到)造成的,所以我們需要再檢查一下線程的狀態,如果發現當前線程池已經變為 STOP 或者之後的狀態,說明確實是由 shutdownNow 方法造成的,需要重新對線程進行中斷,如果不是那就不需要再中斷線程了。

我們看到在 runWorker 里會一直迴圈調用 getTask 來獲取任務,下麵來看下 getTask 的實現

/**
 * getTask 返回 null,說明當前線程需要被回收了
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // rs >= SHUTDOWN 說明當前線程池至少處於待關閉狀態,不再接受新的任務
        //  1. rs >= STOP: 說明不需要在再處理任務了(即便有任務)
        //  2. workQueue.isEmpty(): 說明任務隊列中剩餘的任務已經處理完了
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed 用於判斷是否需要對線程進行超時控制
        //  1. allowCoreThreadTimeOut: 為 true 說明可以對 core 線程進行超時控制
        //  2. wc > corePoolSize: 說明線程池中有非 core 線程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 1. wc > maximumPoolSize || (timed && timedOut)
        //     線程數量大於 maximumPoolSize 值了 或者 允許超時控制並且超時了
        // 2. wc > 1 || workQueue.isEmpty()
        //     線程中活動線程的數量大於 1 或者 任務隊列為空(不需要在留線程執行剩餘的任務了)
        // 如果上面 1 和 2 都成立,就使用 CAS 將線程數量減 1 並返回 null 回收當前線程
        // 如果 CAS 失敗了就重試
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果允許超時控制,則執行 poll 方法,該方法響應超時,當 keepAliveTime 時間內
            // 仍然沒有獲取到任務,就返回 null。take 方法不響應超時操作,當獲取不到任務時會一直等待。
            // 另外不管 poll 還是 take 方法都會響應中斷,如果沒有新的任務添加到隊列中
            // 會直接拋出 InterruptedException
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            // 執行到這裡說明超時了
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

當 getTask 返回 null 的時候說明線程需要被回收了,我們總結一下在 getTask 中返回 null 的情況:

  • 線程池總工作線程數量大於 maximumPoolSize(一般是由於我們調用 setMaximumPoolSize 方法重新設置了 maximumPoolSize)
  • 線程池已經被停止 (狀態 >= STOP)
  • 線程池處於 SHUTDOWN 狀態,並且任務隊列為空
  • 線程在等待任務時超時

我們將 runWorker 和 getTask 結合起來看,整個流程就比較明朗了:

  1. 通過 while 迴圈不斷的從任務隊列中獲取任務,如果當前任務隊列中沒有任務,就阻塞線程。如果 getTask 返回 null,表明當前線程應該被回收,執行回收線程的邏輯。
  2. 如果成功獲取任務,首先判斷線程池的狀態,根據線程池狀態設置當前線程的中斷狀態
  3. 在執行任務之前做一些預處理(用戶實現)
  4. 執行任務
  5. 在執行任務之後做一些後處理(用戶實現)

上面兩個方法是整個線程池中比較核心的部分,在這兩個方法中,完成了任務獲取與阻塞線程的工作。下麵是線程 提交 -> 處理任務 -> 回收 的流程圖:

下麵我們再看下 processWorkerExit 方法,該方法主要用來完成線程的回收工作:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果 completedAbruptly 為 true,說明線程是由於拋出異常而跳出迴圈的,
    // 沒有正確執行 getTask 中減少線程數量的邏輯,所以這裡要將線程數量減一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 更新已完成的任務數量,並移除工作線程
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 嘗試終止線程池
    tryTerminate();

    int c = ctl.get();

    // 如果線程狀態是 SHUTDOWN 或者 RUNNING,需要保證線程中的最少線程數量
    // 1. 如果線程是由於拋出異常而結束的,直接添加一個線程
    // 2. 如果線程是正常結束的
    //    * 如果允許對 core 線程進行超時控制,並且任務隊列中有任務
    //      則保證線程數量大於等於 1
    //    * 如果不允許對 core 進行超時控制,則保證線程數量大於等於 corePoolSize
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

我們看到 processWorkerExit 中調用了 tryTerminate 方法,該方法主要用來終止線程池。如果線程池滿足終止條件,首先將線程池狀態設為 TIDYING,然後執行 terminated 方法,最後將線程池狀態設為 TERMINATED。在 shutdown 和 shutdownNow 方法中也會調用該方法 。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 如果出現下麵三種情況,就不執行終止線程池的邏輯,直接返回
        //  1. 當前線程池處於 RUNNING 狀態,不能停止
        //  2. 當前線程池狀態為 TIDYING 或者 TERMINATED,不需要停止
        //  3. 當前線程池狀態為 SHUTDOWN 並且任務隊列不為空
        if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;
        // 判斷工作線程的數量是否為 0
        if (workerCountOf(c) != 0) { // Eligible to terminate
            // 如果工作線程數量不為 0,就嘗試中斷正線上程池中的空閑線程
            // ONLY_ONE 說明只嘗試中斷線程池中第一個線程(不管線程空不空閑)
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 將線程狀態設為 TIDYING,如果設置不成功說明線程池的狀態發生了變化,需要重試
            // 這裡線程池狀態從 TIDYING 到 TERMINATED 狀態轉換是原子的
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 執行 terminated 方法(預設空方法)
                    terminated();
                } finally {
                    // 將線程狀態設為 TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

在 tryTerminate 方法中, 如果滿足下麵兩個條件,就將線程池狀態設為 TERMINATED:

  1. 線程池狀態為 SHUTDOWN 並且線程池和任務隊列均為空
  2. 線程池狀態為 STOP 並且線程池為空

如果線程池處於 SHUTDOWN 或者 STOP 狀態,但是工作線程不為空,那麼 tryTerminate 會嘗試去中斷線程池中的一個線程,這樣做主要是為了防止 shutdown 的中斷信號丟失(我們在 shutdown 方法處再詳細討論)。下麵看下 interruptIdleWorkers 方法,該方法主要中斷 空閑 線程。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w: workers) {
            Thread t = w.thread;
            // 首先看當前線程是否已經中斷,如果沒有中斷,就看線程是否處於空閑狀態
            // 如果能獲得線程關聯的 Worker 鎖,說明線程處於空閑狀態,可以中斷
            // 否則說明線程不能中斷
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {} finally {
                    w.unlock();
                }
            }
            // 如果 onlyOne 為 true,只嘗試中斷第一個線程
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

關閉線程池

通過 shutdown 和 shutdownNow 我們可以關閉線程池,關於兩者的區別在前面已經提到了,這裡不再贅述。我們首先看下 shutdown 方法:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 檢查當前線程是否有關閉線程池的許可權
        checkShutdownAccess();
        // 將線程池狀態設為 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中斷線程,這裡最終調用 interruptIdleWorkers(false);
        interruptIdleWorkers();
        // hook 方法,預設為空,讓用戶線上程池關閉時可以做一些操作
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

在前面我們知道 interruptIdleWorkers 會先檢查線程是否是空閑狀態,如果發現線程不是空閑狀態,才會中斷線程。而這時中斷線程的主要目的是讓在任務隊列中阻塞的線程醒過來。考慮下麵的情況,如果執行 interruptIdleWorkers 時,線程正在運行,所以沒有被中斷,但是線程執行完任務之後,任務隊列恰好為空,線程就會處於阻塞狀態,而此時 shutdown 已經執行完 interruptIdleWorkers 操作了(即線程錯過了 shutdown 的中斷信號),如果沒有額外操作,線程會一直處於阻塞狀態。所以為了防止這種情況,在 tryTerminate() 中也增加了 interruptIdleWorkers 操作,主要就是為了彌補 shutdown 中丟失的信號。

最後我們再看下 shutdownNow 方法:

public List < Runnable > shutdownNow() {
    List < Runnable > tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 檢查線程是否具有關閉線程池的許可權
        checkShutdownAccess();
        // 更改線程狀態
        advanceRunState(STOP);
        // 中斷線程
        interruptWorkers();
        // 清除任務隊列,並將任務返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

然後我們看下 interruptWorkers 方法:

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 不管線程是否空閑都執行中斷
        for (Worker w: workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

從上面的代碼中我們可以看到在 interruptWorkers 方法中,只要線程開始了,就對線程執行中斷,所以 shutdownNow 的中斷信號不會丟失。最後我們再看下 drainQueue 方法,該方法主要作用是清空任務隊列,並將隊列中剩餘的任務返回。

private List <Runnable> drainQueue() {
    BlockingQueue <Runnable> q = workQueue;
    ArrayList <Runnable> taskList = new ArrayList < Runnable > ();
    // 該方法會將阻塞隊列中的所有項添加到 taskList 中
    // 然後清空任務隊列,該方法是線程安全的
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        // 將 List 轉換為 數組,傳入的 Runnable[0] 用來說明是轉為 Runnable 數組
        for (Runnable r: q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

線程池監控

本節摘自 深入理解Java線程池:ThreadPoolExecutor

通過線程池提供的參數進行監控。線程池裡有一些屬性在監控線程池的時候可以使用

  • getTaskCount:線程池已經執行的和未執行的任務總數;
  • getCompletedTaskCount:線程池已完成的任務數量,該值小於等於taskCount;
  • getLargestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過,也就是達到了maximumPoolSize;
  • getPoolSize:線程池當前的線程數量;
  • getActiveCount:當前線程池中正在執行任務的線程數量。

通過這些方法,可以對線程池進行監控,在ThreadPoolExecutor類中提供了幾個空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以擴展這些方法在執行前或執行後增加一些新的操作,例如統計線程池的執行任務的時間等,可以繼承自ThreadPoolExecutor來進行擴展。

參考文章


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這次的題目是這樣的: 假設有一個6*6的棋盤,每個格子裡面有一個獎品(每個獎品的價值在100到1000之間),現在要求從左上角開始到右下角結束,每次只能往右或往下走一個格子,所經過的格子里的獎品歸自己所有。問最多能收集價值多少的獎品。 最先看到這個問題的時候腦子裡面的立馬出現許多的腦洞:暴力、二叉樹 ...
  • 自動載入類 上傳代碼: ...
  • 本章介介紹了shutil,zipfile模塊的使用,我們先來認識一下這2個模塊吧。 一.shutil模塊 shutil模塊主要用於對文件或文件夾進行處理,包括:複製,移動,改名和刪除文件,在shutil模塊中主要以下這麼幾個函數: 1.複製文件和文件夾 shutil模塊提供了2個函數:shutil. ...
  • 1) { echo ""; return; } //HTTP頭部信息 header("Content-type: application/octet-stream"); header("Accept-Ranges: bytes"); he... ...
  • 下麵列出了當前可用的 PCRE 修飾符。括弧中提到的名字是 PCRE 內部這些修飾符的名稱。 模式修飾符中的空格,換行符會被忽略,其他字元會導致錯誤。 Warning This feature was DEPRECATED in PHP 5.5.0, and REMOVED as of PHP 7. ...
  • 讓我們來回憶下上次你是怎麼發佈你的代碼的: 1. 先把線上的代碼用ftp備份下來 2. 上傳修改了的文件 3. 測試一下功能是否正常 4. 網站500了,趕緊用備份替換回去 5. 替換錯了/替換漏了 6. 一臺伺服器發佈成功 7. 登錄每一臺執行一遍發佈操作 8. 加班搞定 9. 老闆發飆 ... ...
  • 1225 八數位難題 時間限制: 1 s 空間限制: 128000 KB 題目等級 : 鑽石 Diamond 題解 查看運行結果 1225 八數位難題 1225 八數位難題 時間限制: 1 s 空間限制: 128000 KB 題目等級 : 鑽石 Diamond 時間限制: 1 s 空間限制: 128 ...
  • 第五章感覺是第四章的練習項目,無非就是多了一個模擬登錄。 不分小節記錄了,直接上知識點,可能比較亂。 1.常見的httpcode: 2.怎麼找post參數? 先找到登錄的頁面,打開firebug,輸入錯誤的賬號和密碼,觀察post_url變換,從而確定參數。 3.讀取本地的文件,生成cookies。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...