ThreadPoolExecutor線程池內部處理淺析

来源:https://www.cnblogs.com/Jcloud/archive/2023/11/30/17866942.html
-Advertisement-
Play Games

本文從源碼層面主要分析了線程池的創建、運行過程,通過上述的分析,可以看出當線程池中的線程數量超過核心線程數後,會先將任務放入等待隊列,隊列放滿後當最大線程數大於核心線程數時,才會創建新的線程執行。 ...


我們知道如果程式中併發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束時,會因為頻繁創建線程而大大降低系統的效率,因此出現了線程池的使用方式,它可以提前創建好線程來執行任務。本文主要通過java的ThreadPoolExecutor來查看線程池的內部處理過程。

1 ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,下麵我們來看一下ThreadPoolExecutor類的部分實現源碼。

1.1 構造方法

ThreadPoolExecutor類提供瞭如下4個構造方法

// 設置線程池時指定核心線程數、最大線程數、線程存活時間及等待隊列。
// 線程創建工廠和拒絕策略使用預設的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

// 設置線程池時指定核心線程數、最大線程數、線程存活時間、等待隊列及線程創建工廠 
// 拒絕策略使用預設的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

// 設置線程池時指定核心線程數、最大線程數、線程存活時間、等待隊列及拒絕策略
// 線程創建工廠使用預設的
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
   }
// 設置線程池時指定核心線程數、最大線程數、線程存活時間、等待隊列、線程創建工廠及拒絕策略
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

通過觀察上述每個構造器的源碼實現,我們可以發現前面三個構造器都是調用的第四個構造器進行的初始化工作。

下麵解釋一下構造器中各個參數的含義:

  • corePoolSize:核心池的線程個數上線,在創建了線程池後,預設情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務。預設情況下,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中。
  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示線上程池中最多能創建多少個線程。
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。預設情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0。
  • unit:參數keepAliveTime的時間單位。
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響;
  • threadFactory:線程工廠,主要用來創建線程;
  • handler:表示當拒絕處理任務時的策略。有以下四種取值:ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重覆此過程)ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務。

1.2 核心方法

在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裡面最終調用的還是execute()方法。

 public void execute(Runnable command) {
        // 判斷提交的任務command是否為null,若是null,則拋出空指針異常;
        if (command == null)
            throw new NullPointerException();
        // 獲取線程池中當前線程數
        int c = ctl.get();
        // 如果線程池中當前線程數小於核心池大小,進入if語句塊
        if (workerCountOf(c) < corePoolSize) {
            // 如果以給定的命令啟動一個核心線程執行任務成功,直接返回
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果線程池不處於運行狀態並且移除剛加入的任務成功則執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果當前線程數為0,則線上程池裡增加一個線程,保證隊列里的任務不會沒有線程執行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } 
        // 嘗試啟動核心線程之外的線程,如果不滿足,則執行對應的拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

主要方法addWorker。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果線程池狀態大於SHUTDOWN或者線程池狀態等於SHUTDOWN,firstTask不等於null
            // 或者線程池狀態等於SHUTDOWN,任務隊列等於空時,直接返回false結束。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果線程數量大於等於最大數量或者大於等於上限
                //(入參core傳true,取核心線程數,否則取最大線程數),直接返回false結束。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false
                // CAS操作給工作線程數加1,成功則跳到retry處,不再進入迴圈。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果線程池狀態與剛進入時不一致,則跳到retry處,再次進入迴圈
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 新建一個線程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {

                    int rs = runStateOf(ctl.get());
                    // 如果線程池狀態在SHUTDOWN之前或者
                    // 線程池狀態等於SHUTDOWN並且firstTask等於null時,進入處理。
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果要執行的線程正在運行,則拋異常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 啟動線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果線程添加失敗,則將新增的對應信息刪除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

1.3 任務執行run方法

在上述addWorker中,當調用線程的start方法啟動線程後,會執行其中的run方法。

public void run() {
            runWorker(this);
        }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果任務不為空或者新獲取到的任務不為空
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 當線程池狀態,大於等於 STOP 時,保證工作線程都有中斷標誌。
                // 當線程池狀態,小於STOP時,保證工作線程都沒有中斷標誌。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

2 整體處理過程

通過上述源碼分析,我們可以得出線程池處理任務的過程如下:

3 總結

本文從源碼層面主要分析了線程池的創建、運行過程,通過上述的分析,可以看出當線程池中的線程數量超過核心線程數後,會先將任務放入等待隊列,隊列放滿後當最大線程數大於核心線程數時,才會創建新的線程執行。

作者:京東物流 管碧強

來源:京東雲開發者社區 自猿其說Tech 轉載請註明來源


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • C++ 中要在一個函數內返回不同類型的值,你可以使用 C++17 引入的 std::variant 或 std::any,或者使用模板和多態。下麵將分別介紹這些方法。 方法一:使用 std::variant std::variant 允許你在一個函數內返回不同類型的值,但它要求所有可能的返回類型都在 ...
  • 主要探討了SpringMVC中的流程跳轉和不同形式的控制器之間的跳轉方式。首先回顧了JavaWeb中流程跳轉的核心代碼和頁面跳轉方式,並展示了在Web.xml中添加Servlet以及執行這些方式的示例。隨後,介紹了Spring MVC中的四種跳轉形式,包括控制器到JSP頁面的forward和redi... ...
  • 寫在前面 先吐槽兩句,搞個mysql安裝配置弄了4個小時,怎麼都是外網無法訪問,我靠,我特麽也是服了。 當然,後來我投降了,明天再說,學什麼不是學,娘的,換個方向,狀態依然在! Sijax是什麼? 代表 Simple Ajax ,它是一個 Python / jQuery 庫,使用 jQuery.aj ...
  • Sun公司提供了JavaMail用來實現郵件發送,但是配置煩瑣,Spring中提供了JavaMailSender用來簡化郵件配置,Spring Boot則提供了MailSenderAutoConfiguration對郵件的發送做了進一步簡化。 v準備工作 開通POP3/SMTP服務或者IMAP/SM ...
  • C語言分支結構詳解 1. if 語句 在本篇博客文章中,我們將深入探討C語言中的if語句及其相關用法。if語句是一種用於條件判斷的分支語句,它允許我們根據條件的真假來執行不同的代碼塊。 1.1 if 語句的基本語法和用法 if語句的基本語法如下所示: if (條件) { // 條件為真時執行的代碼塊 ...
  • CF786 我不會告訴你鏈接在圖片里 CF786A CF786A題意 給出一個大小為 \(n\) 的環,點順時針從 \(1\to n\) 編號,兩個人(設為 \(0,1\))輪流移動其中的一個棋子。 對於第 \(opt\) 人,他能夠將這個棋子順時針移動 \(x\in S_{opt}\)(\(S_{ ...
  • public class RandomNickName { public enum Gender{ MAN, WOMAN, UNKNOWN, ; } public static void main(String[] args) { String nickName = nickName(Gender. ...
  • 1 簡介 任務是需要資源(CPU 時間、記憶體、存儲、網路帶寬等)在指定時間內完成的一段計算工作。 通過智能地將資源分配給任務以滿足任務級和系統級目標的系統稱為任務調度程式。 任務調度程式: 及時決定和分配資源給任務的過程稱為任務調度。 當我們在 Facebook 發表評論時。我們不會讓評論發佈者等待 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...