頁面查詢多項數據組合的線程池設計

来源:https://www.cnblogs.com/Jcloud/archive/2023/10/20/17776555.html
-Advertisement-
Play Games

我們應對併發場景時一般會採用下麵方式去預估線程池的線程數量,但是在一些情況下,這個t是不好估算的,即便是估算出來了,在實際的線程環境上也需要進行驗證和微調。比如在本文所闡述分頁查詢的數據項組合場景中。 ...


背景

我們應對併發場景時一般會採用下麵方式去預估線程池的線程數量,比如QPS需求是1000,平均每個任務需要執行的時間是t秒,那麼我們需要的線程數是t * 1000。

但是在一些情況下,這個t是不好估算的,即便是估算出來了,在實際的線程環境上也需要進行驗證和微調。比如在本文所闡述分頁查詢的數據項組合場景中。

1、數據組合依賴不同的上游接介面, 它們的響應時間參差不齊,甚至差距還非常大。有些介面支持批量查詢而另一些則不支持批量查詢。有些介面因為性能問題還需要考慮降級和平滑方案。

2、為了提升用戶體驗,這裡的查詢設計了動態列,因此每一次訪問所需要組合的數據項和數量也是不同的。

因此這裡如果需要估算出一個合理的t是不太現實的。

方案

一種可動態調節的策略,根據監控的反饋對線程池進行微調。整體設計分為裝配邏輯線程池封裝設計。

1、裝配邏輯

查詢結果,拆分分片(水平拆分),並行裝配(垂直拆分),獲得裝配項列表(動態列), 並行裝配每一項。

2、線程池封裝

可調節的核心線程數、最大線程數、線程保持時間,隊列大小,提交任務重試等待時間,提交任務重試次數。 固定異常拒絕策略。

調節參數:

欄位 名稱 說明
corePoolSize 核心線程數 參考線程池定義
maximumPoolSize 最大線程數 參考線程池定義
keepAliveTime 線程存活時間 參考線程池定義
queueSize 隊列長度 參考線程池定義
resubmitSleepMillis 提交任務重試等待時間 添加任務被拒絕後重試時的等待時間
resubmitTimes 提交任務重試次數 添加任務被拒絕後重試添加的最大次數
    @Data
	private static class PoolPolicy {

		/** 核心線程數 */
		private Integer corePoolSize;

		/** 最大線程數 */
		private Integer maximumPoolSize;

		/** 線程存活時間 */
		private Integer keepAliveTime;

		/** 隊列容量 */
		private Integer queueSize;

		/** 重試等待時間 */
		private Long resubmitSleepMillis;

		/** 重試次數 */
		private Integer resubmitTimes;
	}

創建線程池:

線程池的創建考慮了動態的需求,滿足根據壓測結果進行微調的要求。首先緩存舊的線程池後再創建新的線程,當新的線程池創建成功後再去關閉舊的線程池。保證在這個替換過程中不影響正在執行的業務。線程池使用了中斷策略,用戶可以及時感知到系統繁忙並保證了系統資源占用的安全。

public void reloadThreadPool(PoolPolicy poolPolicy) {
    if (poolPolicy == null) {
        throw new RuntimeException("The thread pool policy cannot be empty.");
    }
    if (poolPolicy.getCorePoolSize() == null) {
        poolPolicy.setCorePoolSize(0);
    }
    if (poolPolicy.getMaximumPoolSize() == null) {
        poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);
    }
    if (poolPolicy.getKeepAliveTime() == null) {
        poolPolicy.setKeepAliveTime(60);
    }
    if (poolPolicy.getQueueSize() == null) {
        poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);
    }
    if (poolPolicy.getResubmitSleepMillis() == null) {
        poolPolicy.setResubmitSleepMillis(200L);
    }
    if (poolPolicy.getResubmitTimes() == null) {
        poolPolicy.setResubmitTimes(5);
    }
    // - 線程池策略沒有變化直接返回已有線程池。
    ExecutorService original = this.executorService;
    this.executorService = new ThreadPoolExecutor(
            poolPolicy.getCorePoolSize(),
            poolPolicy.getMaximumPoolSize(),
            poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),
            new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),
            new ThreadPoolExecutor.AbortPolicy());
    this.poolPolicy = poolPolicy;
    if (original != null) {
        original.shutdownNow();
    }
}

任務提交:

線程池封裝對象中使用的線程池拒絕策略是AbortPolicy,因此線上程數和阻塞隊列到達上限後會觸發異常。另外在這裡為了保證提交的成功率利用重試策略實現了一定程度的延遲處理,具體場景中可以結合業務特點進行適當的調節和配置。

public <T> Future<T> submit(Callable<T> task) {
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任務
            future = this.executorService.submit(task);
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        throw exception;
    }
    return future;
}

監控:

1、submit提交的監控

見代碼中的「監控點①」,在submit方法中添加監控點,監控key的需要添線程池封裝對象的線程名稱首碼,用於區分具體的線程池對象。

「監控點①」用於監控添加任務的動作是否正常,以便對線程池對象及策略參數進行微調。

public <T> Future<T> submit(Callable<T> task) {
    // - 監控點①
    CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,
                UmpConstant.APP_NAME,
                UmpConstant.UMP_DISABLE_HEART,
                UmpConstant.UMP_ENABLE_TP);
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任務
            future = this.executorService.submit(task);
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        // - 監控點①
        Profiler.functionError(callerInfo);
        throw exception;
    }
    // - 監控點①
    Profiler.registerInfoEnd(callerInfo);
    return future;
}

2、線程池並行任務

見代碼的「監控點②」,分別在添加任務和任務完成後。

「監控點②」實時統計線上程中執行的總任務數量,用於評估線程池的任務的數量的滿載水平。

/** 任務並行數量統計 */
private AtomicInteger parallelTaskCount = new AtomicInteger(0);

public <T> Future<T> submit(Callable<T> task) {
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任務
            future = this.executorService.submit(()-> {
                T rst = task.call();
                // - 監控點②
                log.info("{} - Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.decrementAndGet());
                return rst;
            });
            // - 監控點②
            log.info("{} + Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.incrementAndGet());
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        throw exception;
    }
    return future;
}

3、調節

線程池封裝對象策略的調節時機

1)上線前基於流量預估的壓測階段;

2)上線後跟進監控數據和線程池中任務的滿載水平進行人工微調,也可以通過JOB在指定的時間自動調整;

3)大促前依據往期大促峰值來調高相關參數。

線程池封裝對象策略的調節經驗

1)訪問時長要求較低時,我們可以考慮調小線程數和阻塞隊列,適當調大提交任務重試等待時間和次數,以便降低資源占用。

2)訪問時長要求較高時,就需要調大線程數並保證相對較小的阻塞隊列,調小提交任務的重試等待時間和次數甚至分別調成0和1(即關閉重試提交邏輯)。

作者:京東零售 王文明

來源:京東雲開發者社區 轉載請註明來源


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 知識總覽 2.3.1 單鏈表的定義 知識總覽 單鏈表定義 #include<stdio.h> #include<string.h> #include<stdlib.h> struct LNode{ int data; struct LNode *next; }; int main(){ struct ...
  • dpkt項目是一個`Python`模塊,主要用於對網路數據包進行解析和操作。它可以處理多種協議,例如`TCP`、`UDP`、`IP`等,並提供了一些常用的網路操作功能,例如計算校驗和、解析`DNS`數據包等。由於其簡單易用的特性,`dpkt`被廣泛應用於網路安全領域,例如流量分析、漏洞利用、入侵檢測... ...
  • 一、掃雷游戲分析 關鍵步驟:兩個二維數組的大小為11*11,但實際上操作的只有中心的9*9的棋盤,創建另外兩行的原因是方便統計一個坐標周圍3*3的雷的個數1.創建兩個二維數組一個存放佈置好的雷(1號),另外一個存放空的棋盤(2號)2.選手選出來的坐標傳到1號棋盤上對坐標進行分析如果是雷就返回被炸死了 ...
  • 在上篇文章 每個後端都應該瞭解的 OpenResty 入門以及網關安全實戰 中,我向大家介紹了 OpenResty 的入門使用是 WAF 防禦實戰,這篇文章將給大家繼續介紹 OpenResty 入門之性能測試 篇。 性能測試是軟體開發中不可或缺的一環,它可以幫助我們評估系統的性能、穩定性、可擴展性等 ...
  • 來源:nginx(ID:nginx-study) Cloudflare公司去年宣佈棄用nginx,轉用自研的新一代方向代理服務Pingora,並號稱比nginx更快、更高效、更安全,下麵通過Cloudfare官方網站的一篇文章來瞭解下Pingora比Nginx強在哪裡。 簡介 今天,我們很高興有機會 ...
  • reCAPTCHA是Google公司推出的一項驗證服務,使用十分方便快捷,在國外許多網站上均有使用。它與許多其他的人機驗證方式不同,它極少需要用戶進行各種識圖驗證。 它的使用方式如下如所示,只需勾選覆選框即可通過人機驗證。 雖然簡單但效果很好,因為Google會收集一些瀏覽器信息,網路信息,滑鼠軌跡 ...
  • 簡介 Go語言中的切片(slice)是一種靈活的數據結構,它構建在數組之上並提供了方便的方式來操作數組的一部分。切片的底層實現涉及到數組和一些元數據。以下是Golang切片的底層實現的詳細介紹: 底層數組(Underlying Array): 切片是建立在一個底層數組之上的。這個數組通常比切片的容量 ...
  • 1 概覽 DataX 是一個異構數據源離線同步工具,致力於實現包括關係型資料庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。 1.1 設計理念 為瞭解決異構數據源同步問題,DataX將複雜的網狀的同步鏈路變成了星型數據鏈路 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...