突然想看看線程池

来源:https://www.cnblogs.com/zhangweicheng/archive/2019/12/02/11954794.html
-Advertisement-
Play Games

1 為何要適用線程池 首先我們知道線程對於操作系統來說是一種 珍貴的資源 ,像我們如果每次使用到的時候手動創建,線程執行完 方法後又自動關閉,下次用的時候還得手動創建,這樣無論對於操作系統還是我們來說都是一種 時間 和 資源 的浪費,所以我們可以選擇維護一些線程,這些線程在執行完任務之後繼續執行其他 ...


1 為何要適用線程池

  首先我們知道線程對於操作系統來說是一種珍貴的資源,像我們如果每次使用到的時候手動創建,線程執行完run()方法後又自動關閉,下次用的時候還得手動創建,這樣無論對於操作系統還是我們來說都是一種時間資源的浪費,所以我們可以選擇維護一些線程,這些線程在執行完任務之後繼續執行其他收到的任務,從而實現資源的復用,這些線程就構成了平時說的線程池。其能帶來許多的好處,比如:

  • 實現線程資源復用。減少手動關閉線程的資源浪費。

  • 一定幅度提升響應速度。線上程池承受範圍內(指還能接收任務的狀態)線程可以直接使用,而不用進行手動創建。

  • 方便線程的管理。把線程集中在一起可以讓我們統一的設置其狀態或者超時的時間等,從而達到我們預期的狀態,此外還能減少OOM的發生,比如說如果我們因為一些失誤操作而導致在某個地方不斷的創建線程,那麼會導致系統奔潰,但是如果使用線程池我們可以設定同時執行任務的線程上限和能接收的最大任務數,可以很好的避免這種情況。(當然這是建立在你的最大線程數和任務數處於合理的範圍內)

我們知道線上程的生命周期中(關於線程生命周期可以看看我的另一篇文章——Java線程狀態和關閉線程的正確姿勢),線程正常執行完run()方法就結束進入終止狀態了,那麼線程池是如何實現一個線程在執行完一個任務之後不進入死亡而繼續執行其他任務的呢

2 線程池是如何實現線程復用的

  其實大家大概都能猜到其內部肯定是使用一個while迴圈來不斷獲取任務執行,那我們來看看其內部大概是如何實現的,首先看下execute()方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
  
    // 這些花里胡哨的可以不用管,大概邏輯就下方3.1節的線程池工作流程,關鍵的是找到線程的啟動方法start()
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 其線程的啟動方法就放在這裡的addWorker方法中
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
   // some code....
}

點進去我們可以看到其實現邏輯,這裡刪減一些邏輯以便更加清晰的看清楚:

private boolean addWorker(Runnable firstTask, boolean core) {
    // some code...
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // code...
            if (workerAdded) {
                /*
                 * 可以看到線程是從這裡開始啟動的,我們找到t的根源,發現是Worker中的thread對象
                 * 而我們傳進來的執行也被傳入worker中
                 */
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // ...
    }
    return workerStarted;
}

再跟進worker的構造方法中,看到thread是使用線程工廠創建的,而創建方法則是把自身傳了進去(內部類Worker實現了Runnable方法)

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

也就是說threadrunnable對象就是Worker本身,調用thread.start()方法會調用Workerrun()方法,那麼在上方使用t.start()啟動線程後就會調用Workerrun()方法,讓我們來看下Workerrun()方法做了什麼。

// 調用runWorker()
public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        try {
            // while迴圈不斷的從隊列中獲取任務執行,直到滿足條件退出迴圈
            while (task != null || (task = getTask()) != null) {
                w.lock();
                try {
                    // 預設空實現,可以重寫此方法以便線上程執行前執行一些操作
                    beforeExecute(wt, task);
                    try {
                        // 直接調用task的run方法,而task就是我們傳進來的runnable
                        task.run();
                    } catch (Exception x) {
                        thrown = x; throw x;
                    }  finally {
                        // 同上,鉤子方法
                        afterExecute(task, thrown);
                    }
                } 
            }
         // other code
    }

 okay,到這裡我們就知道了線程池是如何實現線程執行完任務復用的,跟我們一開始想的差不多就是使用一個while迴圈不斷從隊列中獲取任務,顯式的調用任務的run()方法,直到沒有隊列為空(或者其他錯誤因素而退出迴圈)。

3 線程池是如何工作的

3.1 線程池的工作流程

首先先上一張線程池的工作圖,根據工作圖來理解線程池的工作流程,記住這張工作圖對理解線程池有很好的幫助。

1575038980538

  1. 在執行Executor.execute(runnable)或者submit(runnable/callable)的時候,檢查此時線程池中的線程數量是否達到核心線程數,如果還沒有,則創建'核心線程'執行任務。(可以理解為線上程池中分為'核心線程'和'最大線程'兩種種類的線程,'最大線程'在空閑一段時間之後就自己關閉,'核心線程'則會一直嘗試獲取工作)
  2. 如果達到核心線程數,那麼檢查隊列是否已滿,如果沒滿,則將任務放入隊列中等待消費。(線上程池中任務和線程不會直接交互,一般都會維護一個阻塞隊列,任務來的時候嘗試放入隊列中,而線程則是統一從隊列中拿取任務執行)
  3. 如果隊列已滿,那麼檢查線程數量是否達到最大線程數,如果沒有的話則創建'最大線程'執行任務,否則的話則執行拒絕策略。

3.2 如何創建一個線程池

  我們先通過較底層的一個類ThreadPoolExecutor來創建。

public class Test {

    /* -----為了便於理解,可以把線程池中的類型分成兩類——[核心線程]和[最大線程]------ */
    /* -----可以直接看main方法的例子,再上來看這裡參數的註釋方便理解------ */    

    /**
     * 核心線程數,線程池的核心線程到達這個數值之後接收任務便不再創建線程,
     * 而是放入隊列等待消費,直到隊列填滿
     */
    private static final int CORE_POOL_SIZE = 1;

    /**
     * 最大線程數,當隊列被填滿時再接收新的任務的時候就會創建'最大線程'來緩解壓力,
     * '最大線程'在空閑一段時間後會消亡,具體的空閑時間取決於下方的KEEP_ALIVE_TIME,
     * '最大線程'達到這個數值後便不再創建,舉個例子,核心線程數為1,最大線程數為2,
     * 那麼核心線程的數量最多為1,'最大線程'的數量最多為1(最大線程數-核心線程數)
     */
    private static final int MAXIMUM_POOL_SIZE = 2;

    /** 最大線程的空閑時間,'最大線程'空閑時間達到這個數值時消亡,時間單位為下個參數TimeUnit*/
    private static final int KEEP_ALIVE_TIME = 60;

    /** 空閑時間的計量單位*/
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

    /**
     * 任務隊列,為阻塞隊列,阻塞隊列的特點是
     *      1.調用take()方法時,若隊列為空則進入阻塞狀態而不是返回空
     *      2.調用put()方法時,若隊列已滿則進入阻塞狀態
     * 阻塞隊列可以分為數組隊列和鏈表隊列(區別大概就是List和Linked的區別),可以通過設定
     * 邊界值的方式來決定隊列中最多可以容納多少任務,如果超出則創建最大線程或者採取拒絕策略
     * 如果設定了邊界值則為有界隊列,否則則為無界隊列(無界隊列容易引起OOM,隊列的大小應根據需求制定)
     */
    private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new ArrayBlockingQueue<>(1);

    /** 線程工廠,由該工廠產生執行任務的線程*/
    private static final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory();

    /**
     * 拒絕策略,當已達到最大線程並且隊列已滿的時候對新來任務的處理措施,分為四種,由ThreadPoolExecutor內部類實現
     *      1、ThreadPoolExecutor.CallerRunsPolicy 當前線程來執行其任務,也就是說調用executor.execute()的線程執行,
                                                   而不是線程池額外提供線程執行
     *      2、ThreadPoolExecutor.AbortPolicy 直接拋出RejectedExecutionException異常。
     *      3、ThreadPoolExecutor.DiscardPolicy 直接丟棄任務,不會對新來的任務進行任何處理,也不會得到任何反饋。
     *      4、ThreadPoolExecutor.DiscardOldestPolicy 丟棄隊列中最老的任務(指的是隊列的第一個任務,調用poll()方法將其丟棄),
                                                      然後重新調用executor.execute()方法
     */
    private static final RejectedExecutionHandler REJECTED_EXECUTION_HANDLER =  new ThreadPoolExecutor.AbortPolicy();

    public static void main(String[] args) throws InterruptedException {
        // 創建一個主要線程數為1,最大線程數為2,隊列大小為1的線程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
                MAXIMUM_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TIME_UNIT,
                BLOCKING_QUEUE,
                THREAD_FACTORY,
                REJECTED_EXECUTION_HANDLER);
        // 此時線程池中沒有任何線程,直接創建一個主要線程來執行
        executor.execute(() -> {
            try {
                System.err.println("execute thread1:" + Thread.currentThread().getName());
                // 睡眠1秒,驗證下方的線程放入了隊列中而不是再次創建線程
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 此時主要線程數已經達到最大,新來的任務放入隊列中
        executor.execute(() -> {
            try {
                System.err.println("execute thread2:" + Thread.currentThread().getName());
                // 睡眠1秒
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 再次接收任務時,由於隊列滿了嘗試創建最大線程數來執行
        executor.execute(() -> {
            try {
                System.err.println("execute thread3:" + Thread.currentThread().getName());
                // 睡眠1秒
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 線程池已經處於飽和狀態,再次來任務時採取拒絕策略,這裡採取的是直接報錯
        executor.execute(() -> {
            try {
                System.err.println("execute thread4:" + Thread.currentThread().getName());
                // 睡眠1秒
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

結果圖:

1575288958805

  從執行順序可以看出是1->3->2沒有4,也就很好的再現了3.1節的線程池工作的流程,線程池創建核心線程執行任務,核心線程數量達到上限(這裡為1)後將新來的任務放入隊列(這裡大小為1),隊列滿了,嘗試創建最大線程執行任新的任務(這個例子也表明瞭最大線程執行的是新來的任務,而不是在隊列頭的任務),此時線程池已經飽和了,如果再來新任務根據拒絕策略響應處理,這裡選擇報錯,所以不會執行任務4。

​  這裡由於篇幅原因就不舉例其他拒絕策略了,想要驗證可以自己在本機上測試一下。

3.3 簡單聊聊Executors常見的線程池

  在JUC包中提供了一個Executors框架,裡面提供了快速創建五種線程池的方式——newFixedThreadPool()newSingleThreadExecutor()newCachedThreadPool()newScheduledThreadPool()newWorkStealingPool()(Java1.8之後新增,這裡暫不介紹,後續補充)。

  • newFixedThreadPool(int n):創建一個核心線程數和最大線程數都為n的線程池,也就是說,線程池中只有核心線程,不會有最大線程最大線程的容量=最大線程數-核心線程數),其使用的隊列為LinkedBlockingQueue無界隊列,意味著不會拋棄任何任務,也就有可能發生OOM,這是其一大缺點。

    ​ 這些瞭解一下就行,並不需要記甚至可以忘記,前四種快速創建線程池的實質都是使用ThreadPoolExecutor實現,只不過其參數不同罷了,其實現如下,可以看到這些參數決定了它的性質

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
  • newSingleThreadExecutor():實現方面沒什麼好說的,就是上述線程池的n為1,但是唯一不同的是其實現外面又包了一層。

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

 這一層的作用就是重寫finalize()方法,方便JVM回收的時候保證關閉線程池,關於finalizeJVM回收的可以看下我之前的另一篇 JVM垃圾回收的文章。

// 具體實現
static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
    
        // 重寫了finalize()方法,保證線程池回收的時候先執行完剩下的任務
        protected void finalize() {
            super.shutdown();
        }
    }
  • newCachedThreadPool():緩存隊列,沒有核心線程,最大線程數為Integer.MAX_VALUE,並且採用比較特殊的SynchronousQueue隊列,這種隊列實質不會占據存儲空間,在任務提交到線程池中,如果沒有任務接收任務,那麼此時就會進入阻塞狀態,同理線程從隊列拿任務也是一樣,不過這裡的最大線程數為Integer.MAX_VALUE,所以每次來新任務,如果沒有空閑線程就會一直創建線程,也有可能導致OOM
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      // 同步隊列
                                      new SynchronousQueue<Runnable>());
    }
  • newScheduledThreadPool(int n):定時線程池,核心線程數為n,最大線程數為Integer.MAX_VALUE,採用延時隊列DelayedWorkQueue實現,非常適合定時任務的實現。

     public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }

  在《阿裡巴巴開放規範》中也說道不允許使用Executors創建線程池,原因上面也大致說了,沒有好好的把控資源可能導致OOM的情況。

【強制】線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。

4 總結

  在日常的需求中我們難免會遇到需要多個任務同時進行的情況,這時就避免不了使用線程,如果還是使用傳統的方法手動創建線程的話,那麼於我們應用於系統而言都是一種資源浪費,所以我們可以考慮維護一些固定的線程重覆利用,這些線程就構成了一個線程池,有效的管理這些線程可以減少損耗並且一定程度提升響應速度。

  我們知道線程在調用start()方法執行完之後就會消亡,那麼線程池是如何實現一直執行任務的呢?我們在execute()方法中發現了其實現的關鍵是內部類WorkerWorker是一個內部類,實現了Runnable介面,線上程工廠創建線程的時候會將Worker自身傳進去,那麼線程調用start()方法的時候必定會調用Workerrun()方法,而在Workerrun()方法中,則是採用一個while迴圈不斷的從隊列中獲取任務,然後顯示的執行任務的run()方法,從而實現一個線程執行多個任務。

  接著我們使用圖解直觀的看到線程池的工作流程,並使用一小段代碼來解釋ThreadPoolExecutor的各項參數的意義所在,並簡單的模擬了整個工作流程。

  最後講了幾種快速創建線程池的方法,其本質都是調用ThreadPoolExecutor的構造方法,只是參數的不同決定了他們的不同性質,所以ThreadPoolExecutor才是根本,其他的看看就行。同時也講明Executors創建線程池有出現OOM的隱患,所以建議使用ThreadPoolExecutor來創建。

若文章有誤,希望大家幫忙指出。

即使昧著良心我也要說,"Java是世界上最好的語言。"


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 工作這麼久了,對於Java中時間日期的操作一直很蛋疼,一會用Date,一會用Calendar一會用LocalDateTime,始終沒有認真總結過它們的聯繫與區別。迷迷糊糊用了好幾年了,今天終於搞清楚了! ...
  • 函數式編程思想概述 在數學中,函數就是有輸入量、輸出量的一套計算方案,也就是“拿什麼東西做什麼事情”。相對而言,面向對象過 分強調“必須通過對象的形式來做事情”,而函數式思想則儘量忽略面向對象的複雜語法——強調做什麼,而不是以 什麼形式做。 面向對象的思想: 做一件事情,找一個能解決這個事情的對象, ...
  • 在閱讀《阿裡巴巴Java開發手冊》時,發現有一條關於在 foreach 迴圈里進行元素的 remove/add 操作的規約,具體內容如下: 錯誤演示 我們首先在 IDEA 中編寫一個在 foreach 迴圈里進行 remove 操作的代碼: 此時執行代碼,編譯正確,執行成功!輸出 [wupx, hu ...
  • 一、電子郵件的歷史 1.起源: 1969 Lenoard K. 教授發給同事的“LO” 1971 美國國防部自主的阿帕網(Arpanet)的通訊機制 通訊地址里用@ 1987年中國的第一份電子郵件 “Across the Great Wall we can reach every corner in ...
  • 這一切的一切都得從“Hello world”說起!!! 有很多東西在thinkPHP的官方開發文檔上其實都有講到,我在這裡只是想記錄自己每天堅持學習PHP的情況,今天接觸ThinkPHP的路由,路由這一塊可以更好的隱藏我們網站的結構,讓我們的網站更安全,這是路由帶給我們的一些好處。下麵來記錄Thin ...
  • 第1期(20191202) 文章 1. "A short guide to the structure and internals of the " Erlang distributed messaging facility. Erlang分散式啟動流程源碼閱讀指南: 節點啟動時通過 互相發現彼此。 ...
  • from tkinter import * def callback(*args): xl.set(xE.get()) print("改變的數據:",xE.get()) root = Tk() root.title("tkinter的trace()變動追蹤") xE = StringVar() en ...
  • 在workerman中會經常使用,我們先寫一個回調函數,當某個行為被觸發後使用該函數處理相關邏輯。 在PHP中最常用的幾種回調寫法如下 匿名函數做為回調 匿名函數(Anonymous functions),也叫閉包函數(closures),允許臨時創建一個沒有指定名稱的函數。最經常用作回調函數(ca ...
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...