還不懂Java線程池實現原理,看這一篇文章就夠了

来源:https://www.cnblogs.com/yidengjiagou/archive/2022/11/16/16895061.html
-Advertisement-
Play Games

線程池無論是工作還是面試都是必備的技能,但是很多人對於線程池的實現原理卻一知半解,並不瞭解線程池內部的工作原理,今天一燈就帶大家一塊剖析線程池底層實現原理。 ...


線程池無論是工作還是面試都是必備的技能,但是很多人對於線程池的實現原理卻一知半解,並不瞭解線程池內部的工作原理,今天一燈就帶大家一塊剖析線程池底層實現原理。

1. 為什麼要使用線程池

使用線程池通常由以下兩個原因:

  1. 頻繁創建銷毀線程需要消耗系統資源,使用線程池可以復用線程。

  2. 使用線程池可以更容易管理線程,線程池可以動態管理線程個數、具有阻塞隊列、定時周期執行任務、環境隔離等。

2. 線程池的使用

/**
 * @author 一燈架構
 * @apiNote 線程池示例
 **/
public class ThreadPoolDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                3,
                3,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
      
        // 2. 往線程池中提交3個任務
        for (int i = 0; i < 3; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 關註公眾號:一燈架構");
            });
        }
      
        // 3. 關閉線程池
        threadPoolExecutor.shutdown();
    }
}

輸出結果:

pool-1-thread-2 關註公眾號:一燈架構
pool-1-thread-1 關註公眾號:一燈架構
pool-1-thread-3 關註公眾號:一燈架構

線程池的使用非常簡單:

  1. 調用new ThreadPoolExecutor()構造方法,指定核心參數,創建線程池。
  2. 調用execute()方法提交Runnable任務
  3. 使用結束後,調用shutdown()方法,關閉線程池。

再看一下線程池構造方法中核心參數的作用。

3. 線程池核心參數

線程池共有七大核心參數:

參數名稱 參數含義
int corePoolSize 核心線程數
int maximumPoolSize 最大線程數
long keepAliveTime 線程存活時間
TimeUnit unit 時間單位
BlockingQueue workQueue 阻塞隊列
ThreadFactory threadFactory 線程創建工廠
RejectedExecutionHandler handler 拒絕策略
  1. corePoolSize 核心線程數

    當往線程池中提交任務,會創建線程去處理任務,直到線程數達到corePoolSize,才會往阻塞隊列中添加任務。預設情況下,空閑的核心線程並不會被回收,除非配置了allowCoreThreadTimeOut=true。

  2. maximumPoolSize 最大線程數

    當線程池中的線程數達到corePoolSize,阻塞隊列又滿了之後,才會繼續創建線程,直到達到maximumPoolSize,另外空閑的非核心線程會被回收。

  3. keepAliveTime 線程存活時間

    非核心線程的空閑時間達到了keepAliveTime,將會被回收。

  4. TimeUnit 時間單位

    線程存活時間的單位,預設是TimeUnit.MILLISECONDS(毫秒),可選擇的有:

    TimeUnit.NANOSECONDS(納秒)

    TimeUnit.MICROSECONDS(微秒)

    TimeUnit.MILLISECONDS(毫秒)

    TimeUnit.SECONDS(秒)

    TimeUnit.MINUTES(分鐘)

    TimeUnit.HOURS(小時)

    TimeUnit.DAYS(天)

  5. workQueue 阻塞隊列

    當線程池中的線程數達到corePoolSize,再提交的任務就會放到阻塞隊列的等待,預設使用的是LinkedBlockingQueue,可選擇的有:

    LinkedBlockingQueue(基於鏈表實現的阻塞隊列)

    ArrayBlockingQueue(基於數組實現的阻塞隊列)

    SynchronousQueue(只有一個元素的阻塞隊列)

    PriorityBlockingQueue(實現了優先順序的阻塞隊列)

    DelayQueue(實現了延遲功能的阻塞隊列)

  6. threadFactory 線程創建工廠

    用來創建線程的工廠,預設的是Executors.defaultThreadFactory(),可選擇的還有Executors.privilegedThreadFactory()實現了線程優先順序。當然也可以自定義線程創建工廠,創建線程的時候最好指定線程名稱,便於排查問題。

  7. RejectedExecutionHandler 拒絕策略

    當線程池中的線程數達到maximumPoolSize,阻塞隊列也滿了之後,再往線程池中提交任務,就會觸發執行拒絕策略,預設的是AbortPolicy(直接終止,拋出異常),可選擇的有:

    AbortPolicy(直接終止,拋出異常)

    DiscardPolicy(默默丟棄,不拋出異常)

    DiscardOldestPolicy(丟棄隊列中最舊的任務,執行當前任務)

    CallerRunsPolicy(返回給調用者執行)

4. 線程池工作原理

線程池的工作原理,簡單理解如下:

  1. 當往線程池中提交任務的時候,會先判斷線程池中線程數是否是核心線程數,如果小於,會創建核心線程並執行任務。
  2. 如果線程數大於核心線程數,會判斷阻塞隊列是否已滿,如果沒有滿,會把任務添加到阻塞隊列中等待調度執行。
  3. 如果阻塞隊列已滿,會判斷線程數是否小於最大線程數,如果小於,會繼續創建最大線程數並執行任務。
  4. 如果線程數大於最大線程數,會執行拒絕策略,然後結束。

5. 線程池源碼剖析

5.1 線程池的屬性

public class ThreadPoolExecutor extends AbstractExecutorService {

    // 線程池的控制狀態,Integer長度是32位,前3位用來存儲線程池狀態,後29位用來存儲線程數量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 線程個數所占的位數
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 線程池的最大容量,2^29-1,約5億個線程
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // 獨占鎖,用來控制多線程下的併發操作
    private final ReentrantLock mainLock = new ReentrantLock();
    // 工作線程的集合
    private final HashSet<Worker> workers = new HashSet<>();
    // 等待條件,用來響應中斷
    private final Condition termination = mainLock.newCondition();
    // 是否允許回收核心線程
    private volatile boolean allowCoreThreadTimeOut;
    // 線程數的歷史峰值
    private int largestPoolSize;

    /**
     * 以下是線程池的七大核心參數
     */
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private volatile long keepAliveTime;
    private final BlockingQueue<Runnable> workQueue;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;

}

線程池的控制狀態ctl用來存儲線程池狀態和線程個數,前3位用來存儲線程池狀態,後29位用來存儲線程數量。

設計者多聰明,用一個變數存儲了兩塊內容。

5.2 線程池狀態

線程池共有5種狀態:

狀態名稱 狀態值 狀態含義 狀態作用
RUNNING 111 運行中 線程池創建後預設狀態,接收新任務,並處理阻塞隊列中的任務。
SHUTDOWN 000 已關閉 調用shutdown方法後處於該狀態,不再接收新任務,處理阻塞隊列中任務。
STOP 001 已停止 調用shutdownNow方法後處於該狀態,不再新任務,並中斷所有線程,丟棄阻塞隊列中所有任務。
TIDYING 010 處理中 所有任務已完成,所有工作線程都已回收,等待調用terminated方法。
TERMINATED 011 已終止 調用terminated方法後處於該狀態,線程池的最終狀態。

5.3 execute源碼

看一下往線程池中提交任務的源碼,這是線程池的核心邏輯:

// 往線程池中提交任務
public void execute(Runnable command) {
    // 1. 判斷提交的任務是否為null
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 2. 判斷線程數是否小於核心線程數
    if (workerCountOf(c) < corePoolSize) {
        // 3. 把任務包裝成worker,添加到worker集合中
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 4. 判斷如果線程數不小於corePoolSize,並且可以添加到阻塞隊列
    if (isRunning(c) && workQueue.offer(command)) {
        // 5. 重新檢查線程池狀態,如果線程池不是運行狀態,就移除剛纔添加的任務,並執行拒絕策略
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 6. 判斷如果線程數是0,就創建非核心線程(任務是null,會從阻塞隊列中拉取任務)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 7. 如果添加阻塞隊列失敗,就創建一個Worker
    else if (!addWorker(command, false))
        // 8. 如果創建Worker失敗說明已經達到最大線程數了,則執行拒絕策略
        reject(command);
}

execute方法的邏輯也很簡單,最終就是調用addWorker方法,把任務添加到worker集合中,再看一下addWorker方法的源碼:

// 添加worker
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 1. 檢查是否允許提交任務
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        firstTask == null &&
                        !workQueue.isEmpty()))
            return false;
        // 2. 使用死迴圈保證添加線程成功
        for (; ; ) {
            int wc = workerCountOf(c);
            // 3. 校驗線程數是否超過容量限制
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 4. 使用CAS修改線程數
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            // 5. 如果線程池狀態變了,則從頭再來
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 6. 把任務和新線程包裝成一個worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 7. 加鎖,控制併發
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 8. 再次校驗線程池狀態是否異常
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    // 9. 如果線程已經啟動,就拋出異常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 10. 添加到worker集合中
                    workers.add(w);
                    int s = workers.size();
                    // 11. 記錄線程數歷史峰值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 12. 啟動線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

方法雖然很長,但是邏輯很清晰。就是把任務和線程包裝成worker,添加到worker集合,並啟動線程。

5.4 worker源碼

再看一下worker類的結構:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    // 工作線程
    final Thread thread;
    // 任務
    Runnable firstTask;

    // 創建worker,並創建一個新線程(用來執行任務)
    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}

5.5 runWorker源碼

再看一下run方法的源碼:

// 線程執行入口
public void run() {
    runWorker(this);
}

// 線程運行核心方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 1. 如果當前worker中任務是null,就從阻塞隊列中獲取任務
        while (task != null || (task = getTask()) != null) {
            // 加鎖,保證thread不被其他線程中斷(除非線程池被中斷)
            w.lock();
            // 2. 校驗線程池狀態,是否需要中斷當前線程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 3. 執行run方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                // 解鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 4. 從worker集合刪除當前worker
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法邏輯也很簡單,就是不斷從阻塞隊列中拉取任務並執行。

再看一下從阻塞隊列中拉取任務的邏輯:

// 從阻塞隊列中拉取任務
private Runnable getTask() {
    boolean timedOut = false;
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 1. 如果線程池已經停了,或者阻塞隊列是空,就回收當前線程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // 2. 再次判斷是否需要回收線程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 3. 從阻塞隊列中拉取任務
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

6. 總結

今天帶大家一塊詳細剖析了Java線程池的實現原理,是不是非常簡單?

幾百行的方法雖然看著複雜,令人頭疼,只要由淺入深的梳理清理業務邏輯,源碼讀起來也是小菜一碟。

我是「一燈架構」,如果本文對你有幫助,歡迎各位小伙伴點贊、評論和關註,感謝各位老鐵,我們下期見

image


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我國目前並未出台專門針對網路爬蟲技術的法律規範,但在司法實踐中,相關判決已屢見不鮮,K 哥特設了“K哥爬蟲普法”專欄,本欄目通過對真實案例的分析,旨在提高廣大爬蟲工程師的法律意識,知曉如何合法合規利用爬蟲技術,警鐘長鳴,做一個守法、護法、有原則的技術人員。 案情介紹 2018年1月至7月期間,咼某興 ...
  • 大家好,我是棧長。 今天給大家通報一則框架更新消息,時隔兩個月,Spring Cloud 2021.0.5 最新版發佈了,來看下最新的 Spring Cloud 版本情況: Spring Cloud 無疑是現在 Java 微服務事實上的標準,完全基於 Spring Boot 構建,依賴 Spring ...
  • 目錄 一.freeglut 簡介 二.freeglut 下載 五.猜你喜歡 零基礎 OpenGL ES 學習路線推薦 : OpenGL ES 學習目錄 >> OpenGL ES 基礎 零基礎 OpenGL ES 學習路線推薦 : OpenGL ES 學習目錄 >> OpenGL ES 特效 零基礎 ...
  • 類模板=>實力化=>模板類 通過類模板實現棧,點擊查看代碼 #include <iostream> #include <cstring> using namespace std; template<typename T> //template<typename T=int> 也可以這樣寫,寫個預設類 ...
  • ##SpringBoot集成JWT(極簡版) ###在WebConfig配置類中設置介面統一首碼 import org.springframework.context.annotation.Configuration; import org.springframework.web.bind.anno ...
  • 這一篇問文章主要介紹元組的相關知識。 元組:不可修改的序列 與列表一樣,元組也是序列,唯一的差別在於元組是不能修改的(同樣的,字元串也不能修改)。 元組的語法很簡單。 >>> >>> 1, 2, 3 (1, 2, 3) >>> (1, 2, 3) (1, 2, 3) >>> >>> () () >> ...
  • 大家好,我是三友~~ 之前有小伙伴私信我說看源碼的時候感覺源碼很難,不知道該怎麼看,其實這有部分原因是因為沒有弄懂一些源碼實現的套路,也就是設計模式,所以本文我就總結了9種在源碼中非常常見的設計模式,併列舉了很多源碼的實現例子,希望對你看源碼和日常工作中有所幫助。 單例模式 單例模式是指一個類在一個 ...
  • 目錄 一.glut 簡介 二.猜你喜歡 零基礎 OpenGL ES 學習路線推薦 : OpenGL ES 學習目錄 >> OpenGL ES 基礎 零基礎 OpenGL ES 學習路線推薦 : OpenGL ES 學習目錄 >> OpenGL ES 特效 零基礎 OpenGL ES 學習路線推薦 : ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...