記一次線程池任務執行異常

来源:https://www.cnblogs.com/hapjin/archive/2018/11/24/10012435.html
-Advertisement-
Play Games

記一次線程池任務執行異常 一個名為 fetch 線程池負責從Redis中讀取文本數據,將讀取到的文本數據提交給另一個線程池 tw ,將 tw 線程池將任務通過HTTP請求的形式上報給過濾服務。如下圖所示: 一開始採用預設線程池配置方式: 然後只提交三個任務 ,startService() 是個 以 ...


記一次線程池任務執行異常

一個名為 fetch- 線程池負責從Redis中讀取文本數據,將讀取到的文本數據提交給另一個線程池 tw-,將 tw- 線程池將任務通過HTTP請求的形式上報給過濾服務。如下圖所示:

一開始採用預設線程池配置方式:

    private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1000 * 20);    
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fetch-%d").build();
    private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, nThreads, 1, TimeUnit.HOURS,
            taskQueue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

然後只提交三個任務startService(),startService() 是個while(true)以 pipeline 形式不停地從redis上讀取文本數據。程式運行一段時間之後,就卡死了。

//nThreads 是 3  
for(int i = 0; i < nThreads; i++) {
        executorService.execute(()->{
            startService();
        });
    }
    }

看CPU、記憶體以及程式的GC日誌,都是正常的。sudo jstack -l pid發現:

"fetch-26" #109 prio=5 os_prio=0 tid=0x00007fbfe00db000 nid=0xea76 waiting on condition [0x00007fc127bfc000]
"fetch-25" #108 prio=5 os_prio=0 tid=0x00007fbfec03c000 nid=0xea75 waiting on condition [0x00007fc1257dc000]
"fetch-24" #107 prio=5 os_prio=0 tid=0x00007fbf6c001000 nid=0xea74 waiting on condition [0x00007fc127cfd000]

執行從redis中讀取文本任務的fetch- 線程池中的所有線程都阻塞了。由於提交的是Runnable任務,引用《Java併發編程實戰》第七章中一段話:

導致線程提前死亡的最主要的原因是RuntimeException。由於這些異常表示出現了某種錯誤或者其他不可修複的錯誤,因此它們通常不會被捕獲。它們不會在調用棧中逐層傳遞,而是預設地在控制臺中輸出棧追蹤信息,並終止線程

When a thread exits due to an uncaught exception, the JVM reports this event to our UncaughtExceptionHandler, otherwise the default handler just prints the stack trace to standard error.

因此,我就自定義一個UncaughtExceptionHandler看看到底出現了什麼錯誤:

public class FetchTextExceptionHandler implements Thread.UncaughtExceptionHandler {
    private static final Logger logger = LoggerFactory.getLogger(FetchTextExceptionHandler.class);
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("fetch redis text exception,thread name:{},msg:{}", t.getName(), e.getMessage());
    }
}

UncaughtExceptionHandler只是簡單地記錄日誌,先找到出錯原因再說。重新發版,上線一段時間後發現出現程式卡死了,這次有了異常日誌:

2018-11-23 23:10:25.681 ERROR 29818 --- [fetch-0] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-0,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
2018-11-23 23:10:25.686 ERROR 29818 --- [fetch-2] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-2,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
2018-11-23 23:10:27.429 ERROR 29818 --- [fetch-1] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-1,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out

一看這個日誌有點奇怪,fetch線程只是讀取redis上的文本數據,並將文本數據封裝到一個Runnable任務裡面提交給 Tw- 線程池,Tw-線程 才是發送HTTP POST 請求將數據提交給過濾服務。

於是去檢查創建Tw-線程池創建代碼:發現了Tw-線程池採用的是CallerRunsPolicy延遲策略。

    private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setUncaughtExceptionHandler(exceptionHandler).setNameFormat("tw-%d").build();
    private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(20, maximumPoolSize, 1, TimeUnit.HOURS,
            taskQueue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

也就是說:當Fecth-線程提交任務過快時,Tw-線程池的taskQueue滿了,CallerRunsPolicy讓任務回退到調用者,任務由fetch-線程來執行了。因此,上面的日誌列印出來的是線程名字是fetch:thread name:fetch-0,msg.....

再引用一段話:

調用者運行策略(Caller-Runs)實現了一種調節機制,該策略不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而降低新任務的流量。它不會線上程池(這裡的線程池是 tw-線程池)的某個線程中執行新提交的任務,而是在一個調用了execute的線程(fetch 線程)中執行該任務【fetch 線程執行execute 向 tw-線程池提交任務】

知道了異常出現的原因,於是我就把 tw-線程的 飽和策略從原來的CallerRunsPolicy修改成AbortPolicy,再重新運行程式,一段時間後,發現 tw-線程池中的30個線程全部阻塞,fetch-線程池中的三個線程也全部阻塞。如下圖:

在程式中每個tw-線程隔20ms發送一次HTTP POST請求,將文本上報給過濾服務,30個tw-線程,併發量大約是1500次每秒,每次提交的數據不超過30KB吧。

看程式輸出的log日誌:30個tw- 線程 都是一樣的異常SocketTimeoutException,Read timed out

2018-11-24 09:16:47.885 ERROR 9765 --- [tw-310] c.y.t.a.s.ReportTwExceptionHandler : http request report tw exception,thread name:tw-310,cause:java.net.SocketTimeoutException: Read timed out,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out

而3個 fetch-線程的異常日誌是:rejected from java.util.concurrent.ThreadPoolExecutor

2018-11-24 09:04:36.758 ERROR 9765 --- [fetch-2] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-2,msg:Task ReportTwAuditService$$Lambda$75/259476123@7376559c rejected from java.util.concurrent.ThreadPoolExecutor@75fa1939[Running, pool size = 30, active threads = 30, queu
ed tasks = 50000, completed tasks = 20170]

這是因為 fetch-線程向 tw-線程池提交任務,而tw-線程池上面的飽和策略已經改成了AbortPolicy,當tw-線程池任務隊列滿了時,tw-線程就把 fetch-線程 提交過來的任務給拒絕了,並向fetch-線程拋出RejectedExecutionException 異常。

總結一下就是:30個tw-線程因為發送HTTP POST請求給過濾服務出現 SocketTimeoutException,Read timed out全部阻塞,而tw-線程池的飽和策略是AbortPolicy,即:丟棄任務並拋出RejectedExecutionException 異常,導致 fetch 線程阻塞,且提交給tw-線程池的任務被 abort,這就是上面那張圖中所有線程都全部阻塞的原因。

再引用一段話:

工作線程在執行一個任務時被阻塞,如果等待用戶的輸入數據,但是用戶一直不輸入數據,導致這個線程一直被阻塞。這樣的工作線程名存實亡,它實際上不執行任何任務了。如果線程池中的所有線程都處於這樣的狀態,那麼線程池就無法加入新的任務了。各種類型的線程池中一個嚴重的風險是線程泄漏,當從池中除去一個線程以執行一項任務,而在任務完成後該線程卻沒有返回池時,會發生這種情況。發生線程泄漏的一種情形出現在任務拋出一個 RuntimeException 或一個 Error 時。如果池類沒有捕捉到它們,那麼線程只會退出而線程池的大小將會永久減少一個。當這種情況發生的次數足夠多時,線程池最終就為空,而且系統將停止,因為沒有可用的線程來處理任務。

既然tw-線程發送HTTP請求出現了 SocketTImeoutException,那麼來看看HTTP連接池的配置:

import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.TimeUnit;

/**
 * Created by Administrator on 2018/7/4.
 * 配置 RestTemplate 連接池
 */
@Configuration
public class RestTemplateConfig {

    /**
     *  https://stackoverflow.com/questions/44762794/java-spring-resttemplate-sets-unwanted-headers
     *  set http header explicitly: "Accept-Charset": "utf-8"
     */
    @Bean
    public RestTemplate restTemplate() {
    RestTemplate restTemplate = new RestTemplate(httpRequestFactory());
    for (HttpMessageConverter converter : restTemplate.getMessageConverters()) {
        if (converter instanceof StringHttpMessageConverter) {
        ((StringHttpMessageConverter)converter).setWriteAcceptCharset(false);
        }
    }
    return restTemplate;
    }

    @Bean
    public ClientHttpRequestFactory httpRequestFactory() {
    return new HttpComponentsClientHttpRequestFactory(httpClient());
    }

    @Bean
    public HttpClient httpClient() {
    //配置http長連接
    PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(30, TimeUnit.SECONDS);
    connectionManager.setMaxTotal(1000);
    connectionManager.setDefaultMaxPerRoute(20);
    RequestConfig requestConfig = RequestConfig.custom()
            //伺服器返回數據(response)的時間,超過該時間拋出read timeout
            .setSocketTimeout(5000)
            //連接上伺服器(握手成功)的時間,超出該時間拋出connect timeout
            .setConnectTimeout(5000)
            //從連接池中獲取連接的超時時間,超過該時間未拿到可用連接拋出異常
            .setConnectionRequestTimeout(1000).build();

    return HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).
            setConnectionManager(connectionManager).
            setConnectionManagerShared(true)
            //keep alive
            .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
            .build();
    }
}

到這裡就大概知道解決方案了:

  • 讓弱雞的過濾服務牛B一點,這有點不太可能的。你懂的……

  • 控制HTTP 請求速度,並添加線程拋出異常時處理方法(自定義ThreadPoolExecutor,重寫afterExecute方法)而不僅僅是實現UncaughtExceptionHandler,簡單地列印出異常日誌。

  • fetch-線程 阻塞的原因是因為:向tw-線程池提交任務,而tw-線程池採用的飽和策略是AbortPolicy,如果把它改成:DiscardPolicy直接丟棄任務而不拋出異常。這樣fetch-線程就不會收到RejectedExecutionException 異常而阻塞了。當然了,採用DiscardPolicy飽和策略的話,fetch-線程提交任務出現異常就無法感知了,這時我們還可以自定義飽和策略。如下:可以簡單地列印出一個日誌:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author xxx
 * @date 2018/11/24
 */
public class ReportTwRejectExceptionHandler implements RejectedExecutionHandler {
    private static final Logger logger = LoggerFactory.getLogger(ReportTwRejectExceptionHandler.class);
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        logger.error("fetch thread submit task rejected. {}",executor.toString());
    }
}

再修改一下 tw-線程池的配置參數:

//使用我們自定義的飽和策略 RejectedExecutionHandler,當有線程提交任務給tw-線程池時,若出現錯誤會列印日誌
private RejectedExecutionHandler rejectedExecutionHandler = new ReportTwRejectExceptionHandler();
private ReportTwExceptionHandler exceptionHandler = new ReportTwExceptionHandler();
private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setUncaughtExceptionHandler(exceptionHandler).setNameFormat("tw-%d").build();
    //隊列長度為50000
    private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1000 * 10 * 5);
    //maximumPoolSize=30
    private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(20, maximumPoolSize, 1, TimeUnit.HOURS,
            taskQueue, threadFactory, rejectedExecutionHandler);

這樣,如果fetch線程向 tw-線程池提交任務出錯,就會收到如下日誌了:

2018-11-24 17:05:45.511 ERROR 38161 --- [fetch-0] c.y.t.a.e.ReportTwRejectExceptionHandler : fetch thread submit task rejected.java.util.concurrent.ThreadPoolExecutor@41a28fcd[Running, pool size = 30, active threads = 30, queued tasks = 50000, completed tasks = 110438]

其他一些:

private static final int nThreads = Runtime.getRuntime().availableProcessors();//24

返回的是邏輯cpu的個數。

~$ cat /proc/cpuinfo| grep "processor"| wc -l
24

在一臺機器上開多少個線程合適?有個公式\[N_{threads}=N_{cpu}*U_{cpu}*(1+\frac{W}{C})\]

W是等待時間、C是使用CPU的計算時間。因此,需要估計任務的類型,是計算密集型,還是IO密集型?另外:一臺物理機上不僅僅是你寫的程式在上面跑,還有其他人寫的程式也在上面跑,因此,在使用這個公式計算線程數目時也要註意到這一點。

如果任務之間是異構的且獨立的,兩種不同類型的任務,那麼可以使用2個線程池來執行這些任務。比如一個線程池執行CPU密集型任務,另一個線程池執行IO密集型任務。

參考資料

原文:https://www.cnblogs.com/hapjin/p/10012435.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 微服務的目標是通過將應用程式分解為可以獨立部署的小型自治服務來提高應用程式版本的速度。微服務架構也帶來了一些挑戰,這些模式可以幫助緩解這些挑戰。設計模式(design pattern)是對軟體設計中普遍存在(反覆出現)的各種問題,所提出的解決方案。當然微服務中的雲設計模式也是對微服務中普遍存在的問題... ...
  • 一.在Servlet中,表單提交的非字元串類型需要手動轉換 1.在struts中,表單提供的常見數據類型struts框架自動轉換,無需手動轉換 2.在某些情況下,某些自定義類型struts不能完成自動轉換,需要進行手動轉換,如果需要轉換的類型轉換頻率較高時,手動轉換的代碼增多,這時可以使用strut ...
  • 撩課Java+系統架構 視頻 點擊開始學習 81.Servlet的會話機制? 82.Filter是什麼?有什麼作用? 83.Listener是什麼?有什麼作用? 84.你瞭解過Servlet3.0嗎? 85.JSP和Servlet有哪些相同點和不同點? ...
  • 2018-11-24 22:57:33 問題說明 最近看到Spring事務,在學習過程中遇到一個很苦惱問題 搭建好Spring的啟動環境後出現了一點小問題 在啟動時候卻出現[java.lang.NullPointerException] 不過因為當時一個小小的疏忽 很low的問題 請往下看 ... ...
  • 從高位開始逐位輸出一個整數的各位數字:輸入一個整數,從高位開始逐位分割並輸出它的各位數字。 include int main(void) { int i,j,m,n,s,k,a,b=0; scanf("%d",&m); k=m; do { m=m/10; b++; }while(m!=0); for ...
  • 多線程 等待一次性事件 packaged_task用法 背景:不是很明白,不知道為瞭解決什麼業務場景,感覺std::asynck可以優雅的搞定一切,一次等待性事件,為什麼還有個packaged_task。 用法:和std::async一樣,也能夠返回std::future,通過調用get_futur ...
  • 轉載請註明出處: https://www.cnblogs.com/funnyzpc/p/9501376.html ``` 我先閑扯下,前天(也就是2018年11月16號)的某個時候,忽然有人在QQ上私聊我,一看是公司群以為是有人來慰問新人了,也沒弄清楚身份就調侃起來,就這樣: 問題是:我竟傻乎乎滴沒 ...
  • 1.引入依賴 2.配置信息: 3.es配置啟動類: 4.操作工具類: ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...