記一次線程池任務執行異常 一個名為 fetch 線程池負責從Redis中讀取文本數據,將讀取到的文本數據提交給另一個線程池 tw ,將 tw 線程池將任務通過HTTP請求的形式上報給過濾服務。如下圖所示: 一開始採用預設線程池配置方式: 然後只提交三個任務 ,startService() 是個 以 ...
記一次線程池任務執行異常
一個名為 fetch- 線程池負責從Redis中讀取文本數據,將讀取到的文本數據提交給另一個線程池 tw-,將 tw- 線程池將任務通過HTTP請求的形式上報給過濾服務。如下圖所示:
一開始採用預設線程池配置方式:
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1000 * 20);
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fetch-%d").build();
private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, nThreads, 1, TimeUnit.HOURS,
taskQueue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
然後只提交三個任務startService()
,startService() 是個while(true)
以 pipeline 形式不停地從redis上讀取文本數據。程式運行一段時間之後,就卡死了。
//nThreads 是 3
for(int i = 0; i < nThreads; i++) {
executorService.execute(()->{
startService();
});
}
}
看CPU、記憶體以及程式的GC日誌,都是正常的。sudo jstack -l pid
發現:
"fetch-26" #109 prio=5 os_prio=0 tid=0x00007fbfe00db000 nid=0xea76 waiting on condition [0x00007fc127bfc000]
"fetch-25" #108 prio=5 os_prio=0 tid=0x00007fbfec03c000 nid=0xea75 waiting on condition [0x00007fc1257dc000]
"fetch-24" #107 prio=5 os_prio=0 tid=0x00007fbf6c001000 nid=0xea74 waiting on condition [0x00007fc127cfd000]
執行從redis中讀取文本任務的fetch- 線程池中的所有線程都阻塞了。由於提交的是Runnable任務,引用《Java併發編程實戰》第七章中一段話:
導致線程提前死亡的最主要的原因是RuntimeException。由於這些異常表示出現了某種錯誤或者其他不可修複的錯誤,因此它們通常不會被捕獲。它們不會在調用棧中逐層傳遞,而是預設地在控制臺中輸出棧追蹤信息,並終止線程
When a thread exits due to an uncaught exception, the JVM reports this event to our UncaughtExceptionHandler, otherwise the default handler just prints the stack trace to standard error.
因此,我就自定義一個UncaughtExceptionHandler
看看到底出現了什麼錯誤:
public class FetchTextExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(FetchTextExceptionHandler.class);
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("fetch redis text exception,thread name:{},msg:{}", t.getName(), e.getMessage());
}
}
UncaughtExceptionHandler
只是簡單地記錄日誌,先找到出錯原因再說。重新發版,上線一段時間後發現出現程式卡死了,這次有了異常日誌:
2018-11-23 23:10:25.681 ERROR 29818 --- [fetch-0] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-0,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
2018-11-23 23:10:25.686 ERROR 29818 --- [fetch-2] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-2,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
2018-11-23 23:10:27.429 ERROR 29818 --- [fetch-1] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-1,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
一看這個日誌有點奇怪,fetch線程只是讀取redis上的文本數據,並將文本數據封裝到一個Runnable任務裡面提交給 Tw- 線程池,Tw-線程 才是發送HTTP POST 請求將數據提交給過濾服務。
於是去檢查創建Tw-線程池創建代碼:發現了Tw-線程池採用的是CallerRunsPolicy
延遲策略。
private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler(exceptionHandler).setNameFormat("tw-%d").build();
private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(20, maximumPoolSize, 1, TimeUnit.HOURS,
taskQueue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
也就是說:當Fecth-線程提交任務過快時,Tw-線程池的taskQueue
滿了,CallerRunsPolicy
讓任務回退到調用者,任務由fetch-線程來執行了。因此,上面的日誌列印出來的是線程名字是fetch:thread name:fetch-0,msg.....
再引用一段話:
調用者運行策略(Caller-Runs)實現了一種調節機制,該策略不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而降低新任務的流量。它不會線上程池(這裡的線程池是 tw-線程池)的某個線程中執行新提交的任務,而是在一個調用了execute的線程(fetch 線程)中執行該任務【fetch 線程執行execute 向 tw-線程池提交任務】
知道了異常出現的原因,於是我就把 tw-線程的 飽和策略從原來的CallerRunsPolicy
修改成AbortPolicy
,再重新運行程式,一段時間後,發現 tw-線程池中的30個線程全部阻塞,fetch-線程池中的三個線程也全部阻塞。如下圖:
在程式中每個tw-線程隔20ms發送一次HTTP POST請求,將文本上報給過濾服務,30個tw-線程,併發量大約是1500次每秒,每次提交的數據不超過30KB吧。
看程式輸出的log日誌:30個tw- 線程 都是一樣的異常SocketTimeoutException,Read timed out
。
2018-11-24 09:16:47.885 ERROR 9765 --- [tw-310] c.y.t.a.s.ReportTwExceptionHandler : http request report tw exception,thread name:tw-310,cause:java.net.SocketTimeoutException: Read timed out,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
而3個 fetch-線程的異常日誌是:rejected from java.util.concurrent.ThreadPoolExecutor
2018-11-24 09:04:36.758 ERROR 9765 --- [fetch-2] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-2,msg:Task ReportTwAuditService$$Lambda$75/259476123@7376559c rejected from java.util.concurrent.ThreadPoolExecutor@75fa1939[Running, pool size = 30, active threads = 30, queu
ed tasks = 50000, completed tasks = 20170]
這是因為 fetch-線程向 tw-線程池提交任務,而tw-線程池上面的飽和策略已經改成了AbortPolicy
,當tw-線程池任務隊列滿了時,tw-線程就把 fetch-線程 提交過來的任務給拒絕了,並向fetch-線程拋出RejectedExecutionException 異常。
總結一下就是:30個tw-線程因為發送HTTP POST請求給過濾服務出現 SocketTimeoutException,Read timed out
全部阻塞,而tw-線程池的飽和策略是AbortPolicy,
即:丟棄任務並拋出RejectedExecutionException 異常,導致 fetch 線程阻塞,且提交給tw-線程池的任務被 abort,這就是上面那張圖中所有線程都全部阻塞的原因。
再引用一段話:
工作線程在執行一個任務時被阻塞,如果等待用戶的輸入數據,但是用戶一直不輸入數據,導致這個線程一直被阻塞。這樣的工作線程名存實亡,它實際上不執行任何任務了。如果線程池中的所有線程都處於這樣的狀態,那麼線程池就無法加入新的任務了。各種類型的線程池中一個嚴重的風險是線程泄漏,當從池中除去一個線程以執行一項任務,而在任務完成後該線程卻沒有返回池時,會發生這種情況。發生線程泄漏的一種情形出現在任務拋出一個 RuntimeException 或一個 Error 時。如果池類沒有捕捉到它們,那麼線程只會退出而線程池的大小將會永久減少一個。當這種情況發生的次數足夠多時,線程池最終就為空,而且系統將停止,因為沒有可用的線程來處理任務。
既然tw-線程發送HTTP請求出現了 SocketTImeoutException,那麼來看看HTTP連接池的配置:
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.TimeUnit;
/**
* Created by Administrator on 2018/7/4.
* 配置 RestTemplate 連接池
*/
@Configuration
public class RestTemplateConfig {
/**
* https://stackoverflow.com/questions/44762794/java-spring-resttemplate-sets-unwanted-headers
* set http header explicitly: "Accept-Charset": "utf-8"
*/
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate(httpRequestFactory());
for (HttpMessageConverter converter : restTemplate.getMessageConverters()) {
if (converter instanceof StringHttpMessageConverter) {
((StringHttpMessageConverter)converter).setWriteAcceptCharset(false);
}
}
return restTemplate;
}
@Bean
public ClientHttpRequestFactory httpRequestFactory() {
return new HttpComponentsClientHttpRequestFactory(httpClient());
}
@Bean
public HttpClient httpClient() {
//配置http長連接
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(30, TimeUnit.SECONDS);
connectionManager.setMaxTotal(1000);
connectionManager.setDefaultMaxPerRoute(20);
RequestConfig requestConfig = RequestConfig.custom()
//伺服器返回數據(response)的時間,超過該時間拋出read timeout
.setSocketTimeout(5000)
//連接上伺服器(握手成功)的時間,超出該時間拋出connect timeout
.setConnectTimeout(5000)
//從連接池中獲取連接的超時時間,超過該時間未拿到可用連接拋出異常
.setConnectionRequestTimeout(1000).build();
return HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).
setConnectionManager(connectionManager).
setConnectionManagerShared(true)
//keep alive
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
}
}
到這裡就大概知道解決方案了:
讓弱雞的過濾服務牛B一點,這有點不太可能的。你懂的……
控制HTTP 請求速度,並添加線程拋出異常時處理方法(自定義ThreadPoolExecutor,重寫afterExecute方法)而不僅僅是實現UncaughtExceptionHandler,簡單地列印出異常日誌。
fetch-線程 阻塞的原因是因為:向tw-線程池提交任務,而tw-線程池採用的飽和策略是
AbortPolicy
,如果把它改成:DiscardPolicy
直接丟棄任務而不拋出異常。這樣fetch-線程就不會收到RejectedExecutionException 異常而阻塞了。當然了,採用DiscardPolicy飽和
策略的話,fetch-線程提交任務出現異常就無法感知了,這時我們還可以自定義飽和策略。如下:可以簡單地列印出一個日誌:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author xxx
* @date 2018/11/24
*/
public class ReportTwRejectExceptionHandler implements RejectedExecutionHandler {
private static final Logger logger = LoggerFactory.getLogger(ReportTwRejectExceptionHandler.class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.error("fetch thread submit task rejected. {}",executor.toString());
}
}
再修改一下 tw-線程池的配置參數:
//使用我們自定義的飽和策略 RejectedExecutionHandler,當有線程提交任務給tw-線程池時,若出現錯誤會列印日誌
private RejectedExecutionHandler rejectedExecutionHandler = new ReportTwRejectExceptionHandler();
private ReportTwExceptionHandler exceptionHandler = new ReportTwExceptionHandler();
private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler(exceptionHandler).setNameFormat("tw-%d").build();
//隊列長度為50000
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1000 * 10 * 5);
//maximumPoolSize=30
private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(20, maximumPoolSize, 1, TimeUnit.HOURS,
taskQueue, threadFactory, rejectedExecutionHandler);
這樣,如果fetch線程向 tw-線程池提交任務出錯,就會收到如下日誌了:
2018-11-24 17:05:45.511 ERROR 38161 --- [fetch-0] c.y.t.a.e.ReportTwRejectExceptionHandler : fetch thread submit task rejected.java.util.concurrent.ThreadPoolExecutor@41a28fcd[Running, pool size = 30, active threads = 30, queued tasks = 50000, completed tasks = 110438]
其他一些:
private static final int nThreads = Runtime.getRuntime().availableProcessors();//24
返回的是邏輯cpu的個數。
~$ cat /proc/cpuinfo| grep "processor"| wc -l
24
在一臺機器上開多少個線程合適?有個公式\[N_{threads}=N_{cpu}*U_{cpu}*(1+\frac{W}{C})\]
W是等待時間、C是使用CPU的計算時間。因此,需要估計任務的類型,是計算密集型,還是IO密集型?另外:一臺物理機上不僅僅是你寫的程式在上面跑,還有其他人寫的程式也在上面跑,因此,在使用這個公式計算線程數目時也要註意到這一點。
如果任務之間是異構的且獨立的,兩種不同類型的任務,那麼可以使用2個線程池來執行這些任務。比如一個線程池執行CPU密集型任務,另一個線程池執行IO密集型任務。
參考資料
原文:https://www.cnblogs.com/hapjin/p/10012435.html