如何讓ThreadPoolExecutor更早地創建非核心線程

来源:https://www.cnblogs.com/jrjrzivvv/archive/2020/04/28/12797310.html
-Advertisement-
Play Games

最近在項目中遇到一個需要用線程池來處理任務的需求,於是我用 來實現,但是在實現過程中我發現提交大量任務時它的處理邏輯是這樣的(提交任務還有一個 方法內部也調用了 方法): java public void execute(Runnable command) { if (command == null ...


最近在項目中遇到一個需要用線程池來處理任務的需求,於是我用ThreadPoolExecutor來實現,但是在實現過程中我發現提交大量任務時它的處理邏輯是這樣的(提交任務還有一個submit方法內部也調用了execute方法):

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

註釋中已經寫的非常明白:

  1. 如果線程數量小於corePoolSize,直接創建新線程處理任務
  2. 如果線程數量等於corePoolSize,嘗試將任務放到等待隊列里
  3. 如果等待隊列已滿,嘗試創建非核心線程處理任務(如果maximumPoolSIze > corePoolSize

但是在我的項目中一個線程啟動需要10s左右的時間(需要啟動一個瀏覽器對象),因此我希望實現一個更精細的邏輯提升資源的利用率:

  1. 線程池保持corePoolSize個線程確保有新任務到來時可以立即得到執行
  2. 當沒有空閑線程時,先把任務放到等待隊列中(因為開啟一個線程需要10s,所以如果在等待隊列比較小的時候,等待其他任務完成比等待新線程創建更快)
  3. 當等待隊列的大小大於設定的閾值threshold時,說明堆積的任務已經太多了,這個時候開始創建非核心線程直到線程數量已經等於maximumPoolSize
  4. 當線程數量已經等於maximumPoolSize,再將新來的任務放回到任務隊列中等待(直到隊列滿後開始拒絕任務)
  5. 長時間空閑後退出非核心線程回收瀏覽器占用的記憶體資源

當我研究了常見的CachedThreadPoolFixedThreadPool以及嘗試自己配置ThreadPoolExecutor的構造函數後,發現無論如何都不能實現上面提到的邏輯,因為預設的實現只有在workQueue達到容量上限後才會開始創建非核心線程,因此需要通過繼承的方法實現一個新的類來完成需求。

怎麼實現在workQueue到達容量上限前就創建非核心線程?還要回顧下execute函數的代碼

					//嘗試將任務插入等待隊列,如果返回false
					//說明隊列已經到達容量上限,進入else if邏輯
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
//嘗試創建非核心線程
else if (!addWorker(command, false))
    reject(command);

那麼只要改變workQueue.offer()的邏輯,線上程數量還小於maximumPoolSize的時候就返回false拒絕插入,讓線程池調用addWoker,等不能再創建更多線程時再允許添加到隊列即可。

可以通過子類重寫offer方法來實現添加邏輯的改變

@Override
public boolean offer(E e) {
    if (threadPoolExecutor == null) {
        throw new NullPointerException();
    }
    //當調用該方法時,已經確定了workerCountOf(c) > corePoolSize
    //當數量小於threshold,在隊列里等待
    if (size() < threshold) {
        return super.offer(e);
	//當數量大於等於threshold,說明堆積的任務太多,返回false
	//讓線程池來創建新線程處理
    } else {
        //此處可能會因為多線程導致錯誤的拒絕
        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
            return false;
        //線程池中的線程數量已經到達上限,只能添加到任務隊列中
        } else {
            return super.offer(e);
        }
    }
}

這樣就實現了基本實現了我需要的功能,但是在寫代碼的過程中我找到了一個可能出錯的地方:ThreadPoolExecutor線程安全的,那麼重寫的offer方法也可能遇到多線程調用的情況

//設想當poolSize = maximumPoolSize-1時,兩個任務到達此處同時返回false
if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	return false;
}

由於添加到隊列返回falseexecute方法進入到else if (!addWorker(command, false))

if (isRunning(c) && workQueue.offer(command)) {
	int recheck = ctl.get();
	if (! isRunning(recheck) && remove(command))
		reject(command);
	else if (workerCountOf(recheck) == 0)
		addWorker(null, false);
}
//添加到隊列失敗後進入addWorker方法中
else if (!addWorker(command, false))
	reject(command);
}

再來看一下addWorker方法的代碼,這裡只截取需要的一部分

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
    	//兩個線程都認為還可以創建再創建一個新線程
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
        //兩個線程同時調用cas方法只有一個能夠成功
        //成功的線程break retry;進入後面的創建線程的邏輯
        //失敗的線程重新回到上面的檢查並返回false
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    if (runStateOf(c) != rs)
        continue retry;
    // else CAS failed due to workerCount change; retry inner loop
}

最終,在競爭中失敗的線程由於addWorker方法返回了false最終調用了reject(command)。在前面寫的要實現的邏輯里提到了,只有在等待隊列容量達到上限無法再插入時才拒絕任務,但是由於多線程的原因,這裡只是超過了threshold但沒有超過capacity的時候就拒絕任務了,所以要對拒絕策略的觸發做出修改:第一次觸發Reject時,嘗試重新添加到任務隊列中(不進行poolSize的檢測),如果仍然不能添加,再拒絕任務
這裡通過對execute方法進行重寫來實現重試

@Override
public void execute(Runnable command) {
    try {
        super.execute(command);
    } catch (RejectedExecutionException e) {
    	/*
    	這裡參考源碼中將任務添加到任務隊列的實現
    	但是其中通過(workerCountOf(recheck) == 0)
    	檢查當任務添加到隊列後是否還有線程存活的部分
    	由於是private許可權的,無法實現類似的邏輯,因此需要做一定的特殊處理
		if (isRunning(c) && workQueue.offer(command)) {
		     int recheck = ctl.get();
		     if (! isRunning(recheck) && remove(command))
		         reject(command);
		     else if (workerCountOf(recheck) == 0)
		         addWorker(null, false);
		 }
		*/
        if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
            if (this.isShutdown() && remove(command))
                //二次檢查
                realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失敗,隊列已經滿了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}

這裡有兩個小問題:

  1. 初始化線程池傳入的RejectedExecutionHandler不一定會拋出異常(事實上,ThreadPoolExecutor自己實現的4中拒絕策略中只有AbortPolicy能夠拋出異常並被捕捉到),因此需要在初始化父類時傳入AbortPolicy拒絕策略並將構造函數中傳入的自定義拒絕策略保存下來,在重試失敗後才調用自己的rejectedExecution
  2. corePoolSize = 0 的極端情況下,可能出現一個任務剛被插入隊列的同時,所有的線程都結束任務然後被銷毀了,此使這個被加入的任務就無法被執行,在ThreadPoolExecutor中是通過
    else if (workerCountOf(recheck) == 0)
    	addWorker(null, false);
    
    在添加後再檢查工作線程是否為0來確保任務可以被執行,但是其中使用的方法是私有的,無法在子類中實現類似的邏輯,因此在初始化時只能強制corePoolSize至少為1來解決這個問題。

全部代碼如下

public class MyThreadPool extends ThreadPoolExecutor {

    private RejectedExecutionHandler realRejectedExecutionHandler;

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity) {
        this(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                queueCapacity,
                new AbortPolicy());
    }

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity,
                        RejectedExecutionHandler handler) {
        super(corePoolSize == 0 ? 1 : corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new MyLinkedBlockingQueue<>(queueCapacity),
                new AbortPolicy());
        ((MyLinkedBlockingQueue)this.getQueue()).setThreadPoolExecutor(this);
        realRejectedExecutionHandler = handler;
    }

    @Override
    public void execute(Runnable command) {
        try {
            super.execute(command);
        } catch (RejectedExecutionException e) {
            if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
                if (this.isShutdown() && remove(command))
                    //二次檢查
                    realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失敗,隊列已經滿了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}


public class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private int threshold = 20;

    private ThreadPoolExecutor threadPoolExecutor = null;

    public MyLinkedBlockingQueue(int queueCapacity) {
        super(queueCapacity);
    }

    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    @Override
	public boolean offer(E e) {
	    if (threadPoolExecutor == null) {
	        throw new NullPointerException();
	    }
	    //當調用該方法時,已經確定了workerCountOf(c) > corePoolSize
	    //當數量小於threshold,在隊列里等待
	    if (size() < threshold) {
	        return super.offer(e);
		//當數量大於等於threshold,說明堆積的任務太多,返回false
		//讓線程池來創建新線程處理
	    } else {
	        //此處可能會因為多線程導致錯誤的拒絕
	        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	            return false;
	        //線程池中的線程數量已經到達上限,只能添加到任務隊列中
	        } else {
	            return super.offer(e);
	        }
	    }
	}

    public boolean offerWithoutCheck(E e) {
        return super.offer(e);
    }
}

最後進行簡單的測試

corePoolSize:2
maximumPoolSize:5
queueCapacity:10
threshold:7
任務2
線程數量:2
等待隊列大小:0
等待隊列大小小於閾值,繼續等待。
任務3
線程數量:2
等待隊列大小:1
等待隊列大小小於閾值,繼續等待。
任務4
線程數量:2
等待隊列大小:2
等待隊列大小小於閾值,繼續等待。
任務5
線程數量:2
等待隊列大小:3
等待隊列大小小於閾值,繼續等待。
任務6
線程數量:2
等待隊列大小:4
等待隊列大小小於閾值,繼續等待。
任務7
線程數量:2
等待隊列大小:5
等待隊列大小小於閾值,繼續等待。
任務8
線程數量:2
等待隊列大小:6
等待隊列大小小於閾值,繼續等待。
任務9
線程數量:2
等待隊列大小:7
等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,創建新線程處理。
任務10
線程數量:3
等待隊列大小:7
等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,創建新線程處理。
任務11
線程數量:4
等待隊列大小:7
等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,創建新線程處理。
任務12
線程數量:5
等待隊列大小:7
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
任務13
線程數量:5
等待隊列大小:8
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
任務14
線程數量:5
等待隊列大小:9
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
任務15
線程數量:5
等待隊列大小:10
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
隊列已滿
任務16
線程數量:5
等待隊列大小:10
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
隊列已滿

再重新複習一遍要實現的功能:

  1. 線程池保持corePoolSize個線程確保有新任務到來時可以立即得到執行
  2. 當沒有空閑線程時,先把任務放到等待隊列中(因為開啟一個線程需要10s,所以如果在等待隊列比較小的時候,等待其他任務完成比等待新線程創建更快)
  3. 當等待隊列的大小大於設定的閾值threshold時,說明堆積的任務已經太多了,這個時候開始創建非核心線程直到線程數量已經等於maximumPoolSize
  4. 當線程數量已經等於maximumPoolSize,再將新來的任務放回到任務隊列中等待(直到隊列滿後開始拒絕任務)
  5. 長時間空閑後退出非核心線程回收瀏覽器占用的記憶體資源

可以看出,線程池運行的邏輯和要實現的目標是相同的。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 任何可以產生對象的方法或者類,都可以稱之為工廠。單例就是所謂的靜態工廠。 為什麼jdk中有了new,還需要工廠呢? a、靈活的控制生產過程 b、給對象加修飾、或者給對象加訪問許可權,或者能夠在對象生產過程中添加一些日誌信息,再或者根據應用場景添加一些實際的業務處理等等。 1、靜態工廠 單例模式:一種特 ...
  • 一、JSR 303 1、什麼是 JSR 303? JSR 是 Java Specification Requests 的縮寫,即 Java 規範提案。 存在各種各樣的 JSR,簡單的理解為 JSR 是一種 Java 標準。 JSR 303 就是數據檢驗的一個標準(Bean Validation (J ...
  • 概述 在使用Python或者其他的編程語言,都會多多少少遇到編碼錯誤,處理起來非常痛苦。在Stack Overflow和其他的編程問答網站上,UnicodeDecodeError和UnicodeEncodeError也經常被提及。本篇教程希望能幫你認識Python編碼,並能夠從容的處理編碼問題。 這 ...
  • 一丶簡介 Topic Exchange 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。 業務場景: ...
  • 場景 我們用Django的Model時,有時候需要關聯外鍵。關聯外鍵時,參數: 的幾個配置選項到底是幹嘛的呢,你知道嗎? 參數介紹 級聯刪除。Django會模擬SQL約束的行為,在刪除此條數據時,同事刪除外鍵關聯的對象。 比如:用戶的有一個外鍵關聯的是用戶的健康記錄表,當用戶刪除時,配置了這個參數的 ...
  • 【目錄】 一 IO模型介紹 二 阻塞IO(blocking IO) 三 非阻塞IO(non-blocking IO) 四 多路復用IO(IO multiplexing) 五 非同步IO(Asynchronous I/O) 六 IO模型比較分析 七 selectors模塊 本文討論的背景是Linux環境 ...
  • 一、代碼實現 1.數組轉換成List String[] deviceIdAy = buildingDto.getChannelId().split(Symbol.COMMA);//設備idList<String> deviceIdList = Arrays.asList(deviceIdAy); 2 ...
  • 目錄 pyecharts模塊 簡介 Echarts 是一個由百度開源的數據可視化,憑藉著良好的交互性,精巧的圖表設計,得到了眾多開發者的認可。而 Python 是一門富有表達力的語言,很適合用於數據處理。當數據分析遇上數據可視化時,pyecharts 誕生了。 如果想要掌握pyecharts,可以閱 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...