詳細講解線程池的使用

来源:https://www.cnblogs.com/LipeiNet/archive/2018/04/04/8708338.html
-Advertisement-
Play Games

前言:說起threadpoolexector應該大家多少都接觸過,現在我詳細的講解下其的用法 一:解析參數 為了更好地理解threadpoolexecutor,我先講一個例子,話說一個工作多年的高T,一天突然決定自己要單干組織一個團隊,經過仔細的考慮他做出瞭如下的決定 1、團隊的核心人員為10個 2 ...


      前言:說起threadpoolexector應該大家多少都接觸過,現在我詳細的講解下其的用法

一:解析參數

為了更好地理解threadpoolexecutor,我先講一個例子,話說一個工作多年的高T,一天突然決定自己要單干組織一個團隊,經過仔細的考慮他做出瞭如下的決定

1、團隊的核心人員為10個

2、如果一旦出現項目過多人員不足的時候,則會聘請5個外包人員

3、接的項目單子最多堆積100

4、如果項目做完了團隊比較空閑,則裁掉外包人員,空閑的時間為一個月

5、如果接的單子超過100個,則後續考慮一些兜底策略(比如拒絕多餘的單子,或者把多出100個以外的單子直接交付第三方公司做)

6、同時他還考慮瞭如果效益一直不好,那麼就裁掉所有人,宣佈公司直接倒閉

上面的例子恰恰和我們的線程池非常的像,我們來看下threadpoolexecutor的定義。

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);

corePoolSize:核心線程數(就是我們上面說的一個公司核心人員)

maximumPoolSize:最大線程數(就是我們說的一旦公司接收的單子過多則聘請外包,此時也是公司最大的人員了,因為人多了辦公地方不夠了)

keepAliveTime:超多核心線程數之外線程的存活時間(就是如果公司一旦活不多要多久進行裁掉外包人員)

unit:上面時間的單元(可以年月日時分秒等)

workQueue:任務隊列(就是如果公司最大能存的單子)

handler:拒絕策略(就是一旦任務滿了應該如果處理多餘的單子)

allowCoreThreadTimeOut:設置是否清理核心線程(如果設置true,如果任務少於實際執行的線程則會清理核心線程,預設為false)

二:實際演練

先驗證核心線程數

public class ThreadPoolExecutorTest {
    public static void main(String[] args) throws InterruptedException {
        RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(9));
        for (int i = 0; i < 11; i++) {
            AppTask appTask = new AppTask(i);
            poolExecutor.execute(appTask);
            System.out.println("線程池中線程的數目:" + poolExecutor.getPoolSize() + ",線程池中等待的隊列數目:" + poolExecutor.getQueue().size() + ";線程池中已執行完畢的任務數據:" + poolExecutor.getCompletedTaskCount());
        }
        poolExecutor.allowCoreThreadTimeOut(true);
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
    }

    static class AppTask implements Runnable {
        private int taskNum;

        public AppTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"_task_" + this.taskNum + ":執行完畢");
        }
    }

    static LinkedBlockingQueue<Runnable> xs = new LinkedBlockingQueue(10000);
    static int i = 0;

    static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                xs.put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
代碼塊1

那麼我們來看看運行的結果

我們可以看出線程池此時只有2個線程,足矣說明當隊列的數+核心線程數<=任務數,則線程池中只會有核心線程工作。

如果把上述代碼中for迴圈中最大值改為14呢,那麼我們在來看運行的結果

我們可以看的出線程池中的線程已經是5個了,說明當任務>隊列最大值+核心線程數的時候線程池則會生成新的線程來處理任務。

上面我們基本弄明白了核心線程數,最大線程數這些概念,現在我們再來看2個比較重要的參數隊列和拒絕策略

隊列

1、直接提交(SynchronousQueue,不會保存任何任務)

2、無界隊列(LinkedBlockingQueue,對於新加來的任務全部存入隊列中,量大可能會導致oom)

3、有界隊列(ArrayBlockingQueue,隊列有一個最大值,超過最大值的任務交給拒絕策略處理)

拒絕策略

1、當線程池中的數量等於最大線程數時對於新傳入的任務則拋異常(AbortPolicy)

2、當線程池中的數量等於最大線程數時拋棄消息,不做任務處理(DiscardPolicy)

3、當線程池中的數量等於最大線程數時主線程處理新傳入的任務消息(CallerRunsPolicy)

4、當線程池中的數量等於最大線程數時 、拋棄線程池中最後一個要執行的任務,並執行新傳入的任務(DiscardOldestPolicy)

隊列這裡就不多說了大家可以自己去實踐,這裡我們主要說拒絕策略,現線上程池預設的是第一種拒絕策略,直接拋異常,第二種不做處理的這種我們也不提了,我們主要來看下第三種和第四種拒絕策略,先看下第三種拒絕策略的源碼

 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
代碼塊2

從中我們可以看出新傳入的任務並沒有交接線程池來處理,直接交給主線程來處理的,看我下麵這塊代碼以及執行結果

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 12; i++) {
            AppTask appTask = new AppTask(i);
            poolExecutor.execute(appTask);
            System.out.println("線程池中線程的數目:" + poolExecutor.getPoolSize() + ",線程池中等待的隊列數目:" + poolExecutor.getQueue().size() + ";線程池中已執行完畢的任務數據:" + poolExecutor.getCompletedTaskCount());
        }
        poolExecutor.allowCoreThreadTimeOut(true);
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
代碼塊3

運行結果

從結果顯示對於多出隊列的任務則由主線程來執行,主線程執行完畢後,由於隊列被釋放了一些任務,新來的任務又會交給線程池來處理。

第四種情況我們可以看下源碼

 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
代碼塊4

我們可以看出先執行新進入的任務,然後將隊列頭的任務拋棄,這樣會導致一部分消息丟失

 三:使用場景

1、非同步處理日誌,這個是比較場景的採用線程池來解決的

2、定時任務,定時對資料庫備份、定時更新redis配置等,定時發送郵件

3、數據遷移

這些常見的一些場景我們就應該優先來考慮線程池來解決

四:堵塞線程池和非堵塞線程池

這裡我說下通過日誌處理方式來講解堵塞線程池和非堵塞線程池.日誌的作用便於我們分析問題,但是對於服務本身而言卻不是必須的,這也是為什麼我們一般都會非同步來處理日誌的情況。

1、堵塞線程池

在上面幾種幾種隊列中,如果我們選取有有界隊列來,拒絕策略可以採用CallerRunsPolicy,這樣一來就不會出現消息丟失、記憶體溢出等問題,當然我們也可以重寫拒絕策略,我們來看下麵一段代碼

 RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),handler);
        for (int i = 0; i < 12; i++) {
            AppTask appTask = new AppTask(i);
            poolExecutor.execute(appTask);
            System.out.println("線程池中線程的數目:" + poolExecutor.getPoolSize() + ",線程池中等待的隊列數目:" + poolExecutor.getQueue().size() + ";線程池中已執行完畢的任務數據:" + poolExecutor.getCompletedTaskCount());
        }
        poolExecutor.allowCoreThreadTimeOut(true);
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }

 static int count=0;
    static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
                count++;
                System.out.println("阻塞隊列中個數:"+count);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
代碼塊5

運行結果如下

我們可以看出所有的任務都執行完畢了,因為拒絕策略中我們把新進入的任務在次放入隊列中,我們用put這個方法,這個是java提供給我們的阻塞隊列,如果滿了就會一直等待直到隊列中其他任務被釋放。

優點:

1、不會造成記憶體溢出

2、不會出現消息丟失

3、對消費者來說並不需要非常複雜的處理就能滿足業務需求

缺點:

對生成者來說就要變得複雜的多,如果消息量qps過高消費者消費能力不足(如果消費者不足以處理生產者的任務則會堵塞等待,那麼生產者肯定得不到響應,會報出超時異常),如果不採用一些措施將會導致消息丟失,所以對生成者來說必須要持久化的記錄消息,而且設置最大量,如果出現超過最大量的80%則報警,所以在設計生產者的時候必須考慮這樣的場景,

總結:

堵塞線程池其實就是把消費者消費能力不足的壓力交給生產者來處理

2、非堵塞線程池

非堵塞線程池說白就是消息來了就處理,處理不足則進行存儲,或者記錄日誌,我們看如下的代碼

RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), handler);
        for (int i = 0; i < 12; i++) {
            AppTask appTask = new AppTask(i);
            poolExecutor.execute(appTask);
            System.out.println("線程池中線程的數目:" + poolExecutor.getPoolSize() + ",線程池中等待的隊列數目:" + poolExecutor.getQueue().size() + ";線程池中已執行完畢的任務數據:" + poolExecutor.getCompletedTaskCount());
        }
        MainThread.exec();
        poolExecutor.allowCoreThreadTimeOut(true);
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
 static LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue(10000);
    static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                blockingQueue.put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static int count=0;
    static class MainThread {
        public static void exec() throws InterruptedException {
            while (true) {
                if (blockingQueue.size() >= 8000) {
                    System.out.println("報警");
                }
                Runnable r = blockingQueue.poll();
                if (r == null) {
                    Thread.sleep(1000);
                    continue;
                }
                count++;
                r.run();
                System.out.println("非堵塞線程池執行的個數:"+count);
            }
        }
    }
代碼塊6

執行結果

 有人問和上面有什麼區別呢,其實區別就是這個不會出現堵塞,這裡我雖然採用堵塞隊列來存儲,為了更好的展示,其實這裡你可以列印日誌,或者存入redis中然後進行處理。

優點:

1、也不會造成記憶體溢出

2、消費不會出現堵塞

缺點:

這樣設計明顯會複雜的很多,而且獲取消息值不易,就我目前來看我更傾向於採用堵塞線程池

五:線程池的設計

線程池的設計最重要的一點就是計算出生產者最大的qps量和單個線程消費的能力,比如我們生產者qps是1萬,但是我們單個線程處理每個任務的時間是2毫秒,如果我們cpu是4核,那麼我們核心線程是4*2=8個,所以我們每秒處理的任務數是1000/2*8=4000,很顯然我們的消費能力遠遠不足,這個時候我們應該考慮採用多台機器處理,有人不是可以堵塞隊列麽,其實那是一種兜底策略,避免消息丟失,但這並不是我們設計的核心。如果我們能計算出單條消息的大小(如1k)我們分配這個消息服務的記憶體是300M,那麼我們可以做個折中150M來存儲多餘的消息,那麼可以存儲的量是1百萬,如果我們的流量高峰是30分鐘,每秒處理剩餘的消息是200,那麼這半小時之內總共剩餘的消息總量是30*60*200=360000,這樣一來完全可以滿足我們的業務需求。

六:線程池的有點

1、減少頻繁的創建和銷毀線程(由於線程創建和銷毀都會耗用一定的記憶體)

2、線程池也是多線程,充分利用CPU,提高系統的效率

七:Executors下的4種創建線程池的方式

1、newSingleThreadExecutor();

這個線程池的特點是核心線程數只有一個,最大線程數也只有一個,採用的隊列是無界隊列,可能會導致記憶體溢出

2、newFixedThreadPool(5)

這個線程池特點是核心線程數由自己設置並且一旦設置就是固定的不在改變,採用的隊列是無界隊列,可能會導致記憶體溢出

3、newCachedThreadPool()

這個線程池特點是核心線程數為0,最大線程數無界,這樣也會造成記憶體溢出

4、newScheduledThreadPool(5)

這個線程池特點是可以周期性執行任務,核心線程數需要自己進行初始化,最大線程數無界,這樣也會造成記憶體溢出

我們在實際操作中,儘量避免使用Executors來創建多線程,因為如果消息量過大會導致記憶體溢出,消息丟失

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ...
  • 目前知道的迭代器有兩種: 一種是調用方法直接返回的; 一種是可迭代對象通過執行iter方法得到的。 迭代器的好處是可以節省記憶體。 如果在某些情況下,我們也需要節省記憶體,就只能自己寫。自己寫的這個能實現迭代器功能的東西就叫生成器。 ...
  • 1.寫函數,返回一個撲克牌列表,裡面有52項,每一項是一個元組 例如:[(‘紅心’,2),(‘草花’,2), …(‘黑桃’,‘A’)] 2.寫函數,傳入n個數,返回字典{‘max’:最大值,’min’:最小值} 例如:min_max(2,5,7,8,4) 返回:{‘max’:8,’min’:2} 3 ...
  • 常用模塊 1 logging模塊 日誌級別:Noset (不設置) Debug (調試信息) 也可用10表示 Info--(消息信息) 也可用20表示 Warning (警告信息) 也可用30表示 Error (錯誤消息) 也可用40表示 Critical (嚴重錯誤) 也可用50表示 預設級別是W ...
  • 裝飾器本質上就是一個python函數,他可以讓其他函數在不需要做任何代碼變動的前提下,增加額外的功能,裝飾器的返回值也是一個函數對象。 裝飾器的應用場景:比如插入日誌,性能測試,事務處理,緩存等等場景。 ...
  • 1 學習計劃 1、業務受理需求分析 n 業務通知單 n 工單 n 工作單 2、創建業務受理環節的數據表 n 業務通知單 n 工單 n 工作單 3、實現業務受理自動分單 n 在CRM服務端擴展方法根據手機號查詢客戶信息 n 在CRM服務端擴展方法根據取件地址查詢定區id n 調整業務受理頁面回顯客戶信 ...
  • 5-2 5-6 5-8 5-9 5-11 6-1 6-5 6-9 ...
  • 上一節跟大家講了Python的列表,當然不是完整的講完,後續我們還會提到,這一節我們還是來講Python的數據類型 首先要講到的就是元組 元組其實擁有列表的一些特性,可以存儲不同類型的值,但在某些方面元組又比不上列表 定義一個元組,你可以不用加‘ [ ] ’,你只需用逗號隔開即可 例如 元組也能被嵌 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...