前言 在我們實際工作過程中,往往會將大的任務劃分成幾個小的子任務,待所有子任務完成之後,再整合出大任務的結果.(例如: 新增直播課的場景),任務的性質通常是多種多樣的,這裡列舉一些任務的常見性質. 從資源使用的角度: CPU密集型 (枚舉素數) I/O密集型 (文件上傳下載) 從執行過程的角度: 依 ...
目錄
前言
在我們實際工作過程中,往往會將大的任務劃分成幾個小的子任務,待所有子任務完成之後,再整合出大任務的結果.(例如: 新增直播課的場景),任務的性質通常是多種多樣的,這裡列舉一些任務的常見性質.
從資源使用的角度:
- CPU密集型 (枚舉素數)
- I/O密集型 (文件上傳下載)
從執行過程的角度:
- 依賴其他有限資源(資料庫連接池,文件描述符)/不依賴其他有限資源
- 沒有返回值(寫日誌,logService,MesageService)
- 有返回值(計算結果)
- 處理過程中可能拋異常(異常要如何處理)
- 可取消的任務/不可取消的任務
從執行時間的角度:
- 執行時間短(枚舉100以內的素數)
- 執行時間長(資料庫調用)
- 永遠無法結束(爬蟲任務)
- 限時任務,需要儘快響應(H5端介面,GUI點擊事件)
- 定時任務(Job)
任務是對現實問題的抽象,其對應程式中的某些方法,而方法的執行需要調用棧.
從Java記憶體模型圖中可以看出,Java線程為任務的執行提供了所需的方法調用棧,其中包括本地方法調用棧和Java方法調用棧,在32位系統中通常占0.5M,而在64位系統中通常占1M+10幾KB的內核數據結構.
而且有的操作系統也會對一個進程能創建的線程數量進行限制. 因此我們並不能無限制的創建線程,線程是一種共用資源,需要統一維護和調度,以便使資源利用率更加高效,這便有了線程池的概念.
Java記憶體模型圖:
註: 很多服務端的應用程式比如MySQL,Web伺服器都使用了線程池技術.
然而也有例外,對於耗時較短的任務,比如僅有記憶體操作,導致線程的維護時間/任務的執行時間比值偏大,這類任務就不適合使用多線程技術,例如Redis服務.
或者對於需要確保線程安全性的,比如GUI開發包,也是使用單線程,例如Swing開發包,JavaScript等.
對於任務的執行,我們往往會有許多需求,例如觀察任務的執行狀態,獲取任務的執行結果或者執行過程中拋出的異常信息,而對於長時間執行的任務,可能還會需要暫停任務,取消任務,限時等
jdk中有許多執行時間長的任務,例如,Thread.join, Object.wait, BlockingQueue.poll,Future.get,這些任務的介面設計也體現了這些需求例如lock介面:
public void lock(); //基於狀態的介面,當無法獲取鎖時,掛起當前線程
public void lockInterruptibly() throws InterruptedException; // 通過拋出中斷異常來響應中斷
public boolean tryLock();//用於快速判斷釋放可加鎖,用於輪詢
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException; //可限時的操作
下麵列舉一些任務的執行需要考慮的問題.
任務的執行策略:
- 任務在什麼線程中執行
- 任務的執行順序,FIFO還是按優先順序
- 多少個任務可以併發執行
- 任務過多導致系統過載,選擇拒絕哪個任務,即任務的拒絕策略
- 如何通知應用程式有任務被拒絕
- 如何通知應用程式任務的執行結果,包括成功的結果和失敗的結果
為了應對這些繁雜的現實需求,jdk為我們提供了Executor框架.通過這個中間人,將任務的提交和實際執行策略解耦,並且提供了對生命周期的支持(ExecutorService),客戶端只需要關註任務的構建和任務的提交,由中間人來關註實際的執行策略。從而封裝了任務執行的複雜性。
本文主要介紹Java平臺提供的Executor執行框架,其中Runable表示任務,FutureTask表示任務的執行結果,ThreadPoolExecutor表示具體的任務執行策略.
任務的描述
Executor框架中,Runable介面表示任務,但是這個任務沒有返回值且不能拋出異常,而Callable介面卻可以.所以Executors工具類提供了RunableAdapter適配器,通過callalbe(Runable)方法,將 runable轉為 callable.
任務 | 描述 |
---|---|
Runnable | 可執行的任務,無返回值,不可拋出異常 |
Callable | 可執行的任務,有返回值,可以拋出異常 |
FutureTask | 可執行的任務,可以管理任務的執行狀態和讀取任務的執行結果 |
為了對任務維護任務的運行狀態以及非同步獲取任務的運行結果,Executor框架提供了Future類,該類表示一個非同步任務的計算結果.提供了一些方法:
- 獲取任務執行結果 get(), get(long,TimeUnit)
- 取消任務 cancel(boolean)
- 判斷任務是否取消,判斷任務是否完成 isCancelled(),isDone()
同時也提供了記憶體一致性保證: Future.get()之前的操作, happen-before Future.get()之後的操作
FutureTask實現了Future介面和Runable介面,表示一個可取消的任務,AbstractExecutorService正是通過將Runable封裝成FutureTask,來管理和維護任務的狀態以及獲取任務的執行結果,下麵介紹jdk1.8中FutureTask的實現.
AbstractExecutorService:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
FutureTask的設計與實現
在jdk1.8之前的版本中為了簡介,依賴AQS來實現FutureTask,但在jdk1.8後,則放棄使用,通過WaitNode鏈表,來維護等待結果的線程.
FutureTask源碼
public class FutureTask<V> implements RunnableFuture<V> {
/*
* FutureTask是一個併發安全類,有併發控制機制
* 先前版本為了簡潔,使用AQS來實現,jdk1.8後的併發控制使用 一個state 域,通過CAS操作state,同時通過一個簡單的stack來維護 waiting threads*/
private volatile int state;
// 實際的任務
private Callable<V> callable;
// 任務的執行結果或者拋出的異常,通過get()獲取這個欄位
// 不需要 volatile,不會有併發讀寫該欄位的情況
private Object outcome;
// 在run()的時候 會set runner
private volatile Thread runner;
// 一個簡單的等待隊列,為什麼不用AQS了? AQS太重了
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() {
thread = Thread.currentThread();
}
}
}
FutureTask狀態機
瞭解了一個類的狀態機,也就大致瞭解了類的工作過程,FutureTask的狀態機如下所示
狀態 | 描述 |
---|---|
NEW | 初始任務狀態 |
COMPLETING | 任務已執行完,正在設置outcome |
NORMAL | 任務正常執行完成 |
EXCEPTIONAL | 任務執行過程中拋出異常,工作線程終止 |
INTERRUPTED | 執行任務的工作線程收到中斷請求 |
思考一個問題,為什麼要有一個COMLETING中間態?
為了維護複合操作的原子性:設置outcome的值和更新任務狀態需要原子操作
protected void set(V v) {
// 通過CAS先check下,確保狀態轉換是原子op,同時也確保outcome=v 和設置狀態的值這一對複合操作的原子性
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 進來後就是單線程環境了
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
FutureTask幾個關鍵方法
run(),cancel(),awaiDone()
public void run() {
// 先驗條件
if (state != NEW ||
// set runner
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
// 任務執行出錯
result = null;
ran = false;
setException(ex);
}
if (ran)
// 執行成功 set outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// 確保不會丟失中斷信號
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
// cancel: new -> interrupting->interrupted,或者 new -> cancelled
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// check then action, 複合操作, 如果失敗則說明此時任務的狀態不是 new了,返回false,即取消失敗
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
// 通過給 執行該任務的線程 發送中斷信號來取消任務
if (t != null)
t.interrupt();
} finally { // final state
// 發送完後預設置為 interrupted, 表示 信號已發過去了,但任務不一定能停下來,需要任務自己判斷這個信號
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 任務未完成前,通過LockSupport.park等待任務完成
s = awaitDone(false, 0L);
return report(s);
}
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V) x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable) x);
}
/**
* 可中斷的方法
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 可中斷方法的大部分實現,都是通過拋InterruptedException()來響應中斷,註意Thread.interrupted()會清除中斷信號
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果任務 達到了終態,即isDone()了, 即 S>COMPLETIOG,返回 isDone() => s>competing
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
// 正在寫結果, 馬上就結束了
} else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
// 任務還未開始, 即 s=NEW 時,此時創建 等待線程節點,再過一次前面的操作 到下一步
q = new WaitNode();
else if (!queued)
// 新增的節點未入隊,將節點入隊,入隊成功後再過一次前面的操作 到下一步
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 如果過了等待時間了,不等了, 把前面構建的節點從等待隊列中刪除,返回 state
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 否則 限時阻塞當前線程,等待任務完成時被喚醒
LockSupport.parkNanos(this, nanos);
} else
// timed=false時, 永遠阻塞當前線程,等待任務完成時被喚醒
LockSupport.park(this);
}
}
ThreadPoolExecutor的設計與實現
簡介
ThreadPoolExecutor = ThreadPool + Executor
Executor: 其中僅有一個execute(Runable) 方法,工作模式為生產者-消費者模式,提交(submit)任務的客戶端相當於生成者,執行任務的線程(worker)則相當於消費者。
ThreadPool: 從字面意思來看是一個線程的容器,用於管理和維護工作者線程。線程池的工作與任務隊列(work queue)密切相關的,其中任務隊列保存了所有等待執行的任務。
ThreadPoolExecutor即以生產者-消費者為工作模型,基於線程池實現的執行器。
由簡介我們可以瞭解到,一般線程池的實現涉及兩個關鍵組件:work queue,workers。而在ThreadPoolExecutor的設計和實現中,其分別對應BlockingQueue介面,Worker類。
線程池的實現所需關鍵組件 | ThreadPoolExecutor |
---|---|
work queue | BlockingQueue |
worker | final class Worker extends AbstractQueuedSynchronizer implements Runnable{} |
接下來將從四個方面入手,介紹ThreadPoolExecutor的設計與實現,體會大師們(Doug Lea與JCP Expert Group)是如何思考和解決線程池問題的。
- 類的結構與狀態
- 任務的提交與調度
- 線程的創建與執行
- 服務的關閉,任務的取消,線程的回收
類的描述與狀態
在開始介紹ThreadPoolExecutor前,有必要先瞭解下Executor框架的其他組成部分。
Executor介面:框架的核心介面,其中只包含一個execute方法
public interface Executor {
/**
* @throws RejectedExecutionException if this task cannot be accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
ExecutorService介面:線程池作為一個服務需要有服務的狀態維護等操作,這些操作被放到了這個介面,例如shutdown(),shutdownNow(),awaitTermination(),這裡也給出了服務關閉方法。
public static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
AbstractExecutorService類: 提供ExecutorService的基本實現。例如:通過將任務封裝成FutureTask,實現submit方法。任務的批量執行方法:invokeAll,invokeAny的通用實現等。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/** 註意這裡已經說明瞭該介面可能會拋出此異常,但我們常常會忘記處理此異常而導致報錯,但我們卻記得NPE的處理
* @throws RejectedExecutionException
* @throws NullPointerException
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
ThreadPoolExecutor欄位描述
ThreadPoolExecutor欄位
``` public class ThreadPoolExecutor extends AbstractExecutorService { /** * ctl(the main pool control state):用於維護了以下兩個欄位的值 * workCount: 存活著的線程數 低29位,大概5億 * runState: 高3位 線程池服務的狀態: RUNNING(-1),SHUTDOWN(0),STOP(1),TIDYING(2),TERMINATED(3) */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //如何快速獲取n個1? [(1 << n) - 1],CPACITY=29個1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 高3位 private static int runStateOf(int c) { return c & ~CAPACITY; } // 低29位 private static int workerCountOf(int c) { return c & CAPACITY; } // rs + wc => rs | wc; 加法換成位運算,更快一點 :) private static int ctlOf(int rs, int wc) { return rs | wc; } private volatile int corePoolSize; private volatile int maximumPoolSize; private volatile boolean allowCoreThreadTimeOut; /** * idle thread的定義: waiting for work * Timeout in nanoseconds for idle threads waiting for work. * 如果線程池中運行的線程比corePoolSize大,多餘的線程會在沒有任務的時候的等待keep-alive times,如果在這個時間段內還是沒有任務執行,會回收這個線程直到corePoolSize數量(如何回收的? go processWorkerExit), * 註:(這個欄位只有當maximumPoolSize大於corePoolSize時有效) */ private volatile long keepAliveTime; // 任務隊列 private final BlockingQueueThreadPoolExecutor狀態描述
該類主要包含下麵幾個狀態:
狀態 | |
---|---|
RUNNING | Accept new tasks and process queued tasks |
SHUTDOWN | Don't accept new tasks, but process queued tasks |
STOP | Don't accept new tasks, don't process queued tasks |
TIDYING | All tasks have terminated, workerCount is zero,the thread transitioning to state TIDYING will run the terminated() hook method |
TERMINATED | terminated() has completed |
狀態間的操作:
描述 | |
---|---|
shutdown() | 僅會給idle Worker發送中斷信號,會緩慢的結束線程池服務:將queue中的任務都執行完 |
shutdownNow() | 會給所有Worker發送中斷信號,會快速的結束線程池服務(不安全): 嘗試中斷正在執行任務的線程,同時返回queue中的任務列表 |
awaitTermination() | 是一個基於狀態的方法,將在狀態達到TERMINATED時返回,可以用在需要同步判斷線程池關閉的場景 |
其餘方法 |
Worker欄位描述
點擊查看代碼
**
* 實際的工作者線程,主要維護線程的中斷狀態
* 這個類為了簡化在運行任務的時候對鎖的獲取和釋放,設計成了 extends AQS
* 當shutdown的時候,會通過tryLock判斷線程是否正在執行任務,如果為false,表示線程不在執行任務,而是在等待新的任務,通過發送中斷信號,中斷這些線程的等待,這些線程被中斷後會判斷是由於什麼原因喚醒的,如果這個時候線程池狀態為SHUTDOWN,那麼這些線程就會被回收.
* 註:(線程被喚醒的原因可能是被中斷了,也有可能是有任務了,也有可能是時間到了,喚醒後需要二次判斷go getTask())
* 註:(線程由於沒有任務掛起(poll()), 掛起期間可能有新的任務過來了(offer())被喚醒,也有可能被中斷信號通知關閉而被喚醒.)
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* Worker繼承了AQS,也就是說Worker還有一個state屬性欄位,這個欄位是有必要分析下的:
* -1: 剛初始化
* 0: 剛調用runWorker或者沒任務了
* 1: 正在執行任務,正是通過這個state欄位,來判斷線程是否正在執行任務(tryLock)
*/
final Thread thread;
// 在Worker初始化時,firstTask可能有值
Runnable firstTask;
// 每個工作者線程完成的任務數,任務性質可以不同,即線程是可以復用的
volatile long completedTasks;
Worker(Runnable firstTask) {
// 只有在worker線程已開始的時候中斷才有意義,所以在初始化worker的時候state=-1,這個時候不會被中斷go isLocked()
setState(-1);
this.firstTask = firstTask;
// 初始化Workder的時候 通過 threadFactory創建線程,最終通過系統調用,由OS創建內核線程
this.thread = getThreadFactory().newThread(this);
}
// runWorker實際實現主執行迴圈,接下來就是重點了,任務線程初始化時,拿到了firstTask(有的話),以及一個新的線程,接下來就開始真正地執行任務了
public void run() {
runWorker(this);
}
// 設計worker類的主要目的,用來中斷線程
void interruptIfStarted() {
Thread t;
// 只有在worker線程已開始的時候中斷才有意義, 所以在初始化worker的時候state=-1,這個時候不會被中斷
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker狀態描述
Worker主要有3個狀態
狀態 | 描述 |
---|---|
INIT(-1) | 初始Worker狀態 |
WAINTING TASK(0) | 等待任務到達 |
RUNING | 正在執行任務 |
Worker狀態機如圖
任務的提交與調度
在介紹完具體的ThreadPoolExecutor與Worker的描述以及狀態機後,我們先來大致看下ThreadPoolExecutor的工作流程,有助於理解後續的操作步驟.
從圖中我們可以看出,一個正常執行完成的任務其主要經過submit() -> addWorker()->worker.thread.start()->worker.run()->runWorker()→task.run()等步驟,下麵我們具體介紹下這些步驟.
任務調度方法主要在execute,具體源碼註釋如下:
點擊查看代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 線程池小於core
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 新增任務失敗,可能在addWorker的時候線程數達到了corePoolSize的水平,此時放到workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 判斷如果線程池正在shutdown,拒絕任務
if (!isRunning(recheck) && remove(command))
reject(command);
// 確保任務隊列中的任務可以被線程執行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 工作隊列滿了,再嘗試增加worker,線程個數判斷使用 maxvalue
} else if (!addWorker(command, false))
reject(command);
}
工作線程的創建與執行
工作線程的創建主要根據線程池狀態,core和maximum參數判斷是否可以新增工作線程,如果新增成功,則開始執行任務.
工作線程的創建
點擊查看代碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
// 當shutdown且 隊列是空的時候就沒必要加worker了
return false;
for (;;) {
int wc = workerCountOf(c);
// 達到限制數量了也返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 非shutdown,或者 是shutdown但是firstTask==null的時候,可以新增線程
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 新增worker的時候更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// start() -> runWorker()->task.run()
// 新增成功後 調用start(),如果start()失敗了,比如ntive stack申請失敗,也返回false
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
工作線程的執行
工作者線程的執行原理上比較簡單,既不斷從任務隊列中取出任務,執行任務,然後返回線程池並等待下一個任務。
// 典型的線程池工作者線程結構
public void run() {
Throwable thrown = null;
try {
while(!isInterrupted())
runTask(getTaskFromWorkQueue());
}catch(Throwable e) {
throw = e;
}finaly {
threadExited(this,thrown);
}
}
}
下麵是ThreadPoolExecutor實際的工作者線程的任務執行,其中會涉及到線程的回收,任務的取消等實現.
點擊查看代碼
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// state: -1 => 0 , unlock -> release -> tryRelease -> state=0
// 這個時候任務線程開始工作,可以被中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// getTask從隊列中拿任務
while (task != null || (task = getTask()) != null) {
// 工作的時候將state置為1,表示正在工作,這個操作一定會成功(正常來說lock是一個基於狀態的方法,可能會阻塞調用線程),因為不會有其他地方調用w.lock
// 註:(state: 0 => 1 lock -> acquire -> tryAcquire -> state=1)
w.lock();
// 線程當且僅當池子stopping(shutdown,shutdownNow的時候)的時候才會interrupted,且一定要interrupted
// 註:(worker線程是由線程池服務來維護的,只有線程池服務有權對worker線程進行中斷操作)
if ((runStateAtLeast(ctl.get(), STOP) ||
// 註:(Thread.interrupted會清除interrupted標記)
// 這裡表明worker線程只能在STOPING(STOP,TIDING,TERMINATED)時中斷信號有效,其他形式的中斷信號(例如在任務中中斷)會被清除
(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)))
&&
!wt.isInterrupted())
wt.interrupt();
try {
// hooc 函數
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
// 保存異常 thrown 到1326處理(給客戶端提供的鉤子函數,afterExecute,使客戶端可以感知到任務失敗併進行特定的處理),同時拋出異常到
// 1330 處理(線程池自身對任務異常的處理)
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 將任務執行過程中的異常傳入到hooc函數
afterExecute(task, thrown);
}
} finally {
// beforeExecutehooc函數出錯或者任務出錯了的話,task=null,從而跳到1336,completedAbruptly=true,從而回收線程,即使線程並沒有完成任何工作
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 處理task=null的場景或者任務拋處異常時的場景,釋放線程,什麼時候task會為null ,go getTask
processWorkerExit(w, completedAbruptly);
}
}
服務的關閉,任務的取消與線程的回收
服務的關閉
通過調用shutdown或者shutdownNow給工作線程發送中斷信號嘗試取消任務,並回收線程,繼而關閉服務
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 狀態至為SHUTDOWN
advanceRunState(SHUTDOWN);
// 給每個idle工作線程(已啟動且沒任務的)線程發送中斷信號
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 狀態置為 STOP
advanceRunState(STOP);
// 給每個工作線程發送中斷信號
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
線程的回收
線程回收流程圖:
觸發線程的回收主要有下麵幾種情況
- 由於setMaximumPoolSize,導致currentSize > maximumPoolSize時,getTask()返回null
- 線程池狀態為stop時,即調用shutdownNow()時,getTask()返回null
- 線程池狀態為shutdown,即調用shutdown(),線程池給idle線程發送中斷信號,如果此時任務隊列為空時,getTask()返回null
- 線程等待任務超時,getTask()返回null
- 任務執行失敗,拋出運行時異常,導致task=null
當getTask()返回null或者task=null時,runWorker()跳到processWorkExit()處理線程回收,此時會新增線程來替換由於任務異常而被終止的線程
點擊查看代碼
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// stopped 或者 shutdown 且 workQueue.isEmpty 返回null 2,3
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut等價於 wc> corePooSize
// allowCoreThreadTimeOut, wc>corePoolSize, 一起表示 當任務線程獲取任務超時時,被要求中斷(subject to termination)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1 wc > maxPoolSize 或者 4 獲取任務超時且 要求獲取任務超時的進程被中斷(timed && timedOut)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果沒有任務,則阻塞在這裡, workQueue.offer後繼續運行
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
// r==null,poll返回null,表示timedOut,下次 go 1210
timedOut = true;
} catch (InterruptedException retry) {
// 忽略中斷信號
timedOut = false;
}
}
}
線程池通過workers.remove()操作來釋放worker的引用,從而由垃圾回收器回收線程,如果線程是由於任務執行異常而導致的終止,則會新增工作線程來替換它.
點擊查看代碼
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// getTask時會decrementWorkerCount
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 回收線程前先將線程執行的任務數加一下
completedTaskCount += w.completedTasks;
// 通過釋放worker引用來釋放線程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 如果不是由於任務忽然中斷且線程數符合最小值的要求,那麼無需addWorker替換
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果任務線程是由於任務執行異常退出的 或者 線程池中的數量小於min,addWorker
addWorker(null, false);
}
}
由上文我們瞭解到,無論是任務的取消,還是線程池服務的關閉,其中都是通過線程的中斷來實現的,理解了線程中斷我們就能夠理解任務的取消以及服務關閉的具體含義。
線程的中斷
中斷機制是一種Java線程間的通信機制,每個Java線程都有一個boolean類型的中斷狀態。當調用Thread.interrupt(),並不意味著立即停止目標線程正在執行的任務,只是傳遞一個中斷請求,將中斷狀態置為true。至於什麼時候讀這個狀態,以及基於這個狀態做什麼操作,則完全由任務自身去控制。(早期的jdk庫提供了Thread.stop(),Thread.suspend(),Thread.resume()來允許用戶暴力終止,暫停,恢復一個線程,在jdk1.2後這些方法就被置為deprecated了,因為這樣操作是不安全的,stop()會強制釋放掉線程持有的鎖,拋出ThreadDeathException,導致數據處於不一致的狀態,從而造成未知的後果)例如:
public class Factorizer {
private BigInteger lastNumber;
private BigInteger[] lastFactors;
public synchronized BigInteger[] cal(BigInteger number) {
if(Objects.equal(number,lastNumber)) {
return lastFactors;
}else {
//這兩步是複合操作,需要原子性,我們不會在這兩步之間判斷Thread.currentThread().isInterrupted()
lastFactors = factor(i);
lastNmuber=i;
return lastFactors;
}
}
}
jdk中有許多長時任務都是通過中斷機制取消任務的。它們對中斷的響應通常是:清除中斷狀態(Thread.interrupted()),然後拋出一個異常(InterruptedException),表示長時任務操作由於中斷而提前結束。(wait,join,sleep,FutureTask.get(),CoundownLatch.await,lockInterrrupted(),BlockQueue.poll()等)
在編寫任務的時候,基於這個狀態做什麼請求或者不做請求,例如重試或者忽略,都是可以的,只要滿足自身任務的需要即可。但設計糟糕的任務可能會屏蔽中斷請求,從而導致其他方法調用該任務的時候無法對中斷進行響應,例如:
不安全的中斷示例
public static void main(String[] args) {
Thread calPrimeTask = new Thread(InterruptedTest::calPrime);
calPrimeTask.start();
ThreadUtil.sleep(1000);
// 嘗試終止calPrimeTask
calPrimeTask.interrupt();
}
public static void calPrime() {
while(!Thread.currentThread().isInterrupted()) {
ThreadUtil.sleep(50);
log();
System.out.println("一個耗時50ms的任務完成!");
}
}
public static void log() {
/**
* 假設有一段代碼調用了jdk中的某個可能拋出InterruptedException的介面,這段代碼捕獲到這個異常後本意是不會處理這個異常,但是如果它沒有再
* Thread.currentThread().interrupt(),就會影響其他使用到這個方法的函數,例如calPrime();
*/
ArrayBlockingQueue<Integer> que = new ArrayBlockingQueue<>(12);
try {
System.out.println("do other thing");
que.poll(30, TimeUnit.MILLISECONDS);
}catch (InterruptedException e) {
e.printStackTrace();
// poll拋出 InterruptedException後,會清空 interrupted標記,這裡返回false
System.out.println(Thread.currentThread().isInterrupted());
// 如果這裡不重新設置interrupted標記的話,這回使得calPrimary任務無法取消,我們不知道調用棧的其他地方是否會用到中斷信號,所以必須把中斷信號設置回去
Thread.currentThread().interrupt();
}
}
// 不支持取消但仍可以調用可中斷阻塞方法,忽略中斷信號並重試
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while(true) {
try {
return queue.take();
}catch(InterruptedException e) {
interrupted = true;
// 忽略並重試
// 如果我們在這裡調用Thread.currentThread().interrupt()的話會引起死迴圈
}
}finally {
if(interrupted) {
// 避免屏蔽中斷信號,其他方法可能需要
Thread.currentThread().interrupt();
}
}
}
}
線程池的使用
在實際生產生活中,由於任務性質的多種多樣,我們往往會自定義符合各自應用場景的線程池來執行任務,不同的線程池參數設置意味著不同的任務執行策略(避免雞蛋放在一個籃子里)。
// 不自定義線程池的危害:可能造成無法預知的死鎖情況,次要的任務的執行影響重要的任務
public void deadLock() throws Exception{
CountDownLatch countDownLatch = new CountDownLatch(1);
Callable<String> task1 = () -> {
ThreadUtil.sleep(2000);
countDownLatch.countDown();
return "task1 finished";
};
// task2 依賴 task1
Callable<String> task2 = () -> {
countDownLatch.await();
ThreadUtil.sleep(1000);
return "task2 finished";
};
ExecutorService executorService = Executors.newFixedThreadPool(1);
// 假如 task2先於task1調度,就會發生死鎖,因為只有一個線程,task1在任務隊列里依賴task2的完成
Future<String> result2 = executorService.submit(task2);
Future<String> result1 = executorService.submit(task1);
System.out.println(result2.get() + result1.get());
};
那麼,線程池參數的選擇就顯得尤為重要。以下是一些ThreadPoolExecutor參數的介紹以及使用建議。
1 核心線程數與最大線程數
從上文的任務執行流程我們大致可以瞭解到,線程池主要通過這兩個參數控制工作線程的數量。在設置這兩個參數的時候需要註意以下兩個問題。
1.1 工作線程的大小設置
大小設置主要影響系統的吞吐量。如果設置過小造成資源利用率低,人為地降低了系統的吞吐量,如果設置過大會造成線程競爭加劇,使得消耗更多的的計算資源線上程上下文切換上而不是執行任務上,最終也會導致系統的吞吐量降低。
設置線程池大小主要以下幾種策略:
策略一
coreSize = 2×Ncpu,maxSize = 25×Ncpu
實際使用中我們往往會定義許多線程池,如果每個線程池的大小會導致核心線程越來越多,會使得競爭加劇,甚至達到操作系統限制的線程池數量
策略二
W/C: wait time/ compute time
Ucpu: 目標CPU利用率
Ncpu: Runtime.getRuntime().availableProcessors()
Nthreads = Ncpu * Ucpu * (1 + W/C)
I/O任務大部分時間都在等待I/O完成,這個值會比較大,上線前不好估計此值,而且線程池中的任務類型不一定都是一致的
策略三
QPS:每秒任務數 999線:單個任務的花銷
1s內一個線程能夠執行的任務數: 1/999線
1s內n個線程能執行的任務數:n * 1/999線
即 QPS=n/999線 ==> n = QPS*999線
核心線程:corePoolSize = QPS * 999線(單位:s)
timeout: 能容忍介面的最大響應時間
隊列大小:queueCapacity = corePoolSize/999線 * timeout = QPS * timeout
最大線程:maxPoolSize = (QPS峰值- queueCapacity) * 999線
此策略考慮了實際生產環境的任務使用情況,也是假定線程池中的任務是同類型的。
如果線程池中的任務不是服務間調用而是單獨的函數或者sql調用,那麼QPS和999線就不好估計了。
策略四
使用動態線程池,可以動態調整線程池參數,以應對不同的使用場景變化,且可以通過cat監控線程池的使用情況。
實際生產業務使用中建議使用動態線程池,動態調整任務執行策略,同時為避免線程資源浪費,搭配下文提到的allowCoreThreadTimeOut一起使用。
public static ThreadPoolExecutor getExecutor(String name,boolean allowCoreThreadTimeOut) {
ThreadPoolExecutor result = null;
try {
result = DynamicThreadPoolManager.getInstance().getThreadPoolExecutor(name);
}catch (PoseidonException e) {
log.error("ExecutorCase.getExecutor Error:",e);
}
if(Objects.isNull(result)) {
return ExecutorUtil.getExecutor(name,Runtime.getRuntime().availableProcessors());
}
// 任務完成後不需要留有核心線程可關閉
result.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return result;
}
1.2 工作線程的回收
線程本質上是兩個方法調用棧,是一個共用資源。當一個線程池處理QPS較低的任務時(eg:boss後臺的介面,或者執行周期長的定時任務),我們往往會想當無任務執行的時候線程池可以自動回收線程資源,於是將coreSize設置成0。
假如我們將coreSize=0,但卻使用的是有界隊列,比如new ArrayBlockingQueue<>()。按照上文的執行流程,那麼只有當任務塞滿任務隊列的時候,線程池才會正式開始執行任務。
為瞭解決這個問題,jdk1.6版本後的ThreadPoolExecutor提供了allowCoreThreadTimeOut欄位。將這個欄位置為ture後,我們不用設置coreSize=0,就可以讓線程在無任務的時候等待keepAliveTime時間,將coreThread回收。具體實現可以查看:getTask,processWorkerExit方法。
2 任務隊列
ThreadPoolExecutor使用的是BlockingQueue作為任務隊列,即任何阻塞隊列都可以用於任務的存儲和轉發。下麵介紹3種常見任務隊列的選擇策略
策略一
無界隊列(Unbounded queues)
例如LinkedBlockingQueue。使用無界隊列主要適用於任務執行時間很短且確定的任務,例如找出某個自然數的因數。這種任務一定能夠快速執行完成。但是實際業務場景中的任務執行時間通常是不確定的,需要遠程調用介面,有許多I/O操作,這樣就找出了任務消費很慢,如果此時有任務提交過來會找出OOM,從而影響整個服務的穩定。所以不建議使用。
策略二
有界隊列(Bounded queues)
例如ArrayBlockingQueue。用於限制資源的使用量,避免出現OOM。
調整任務隊列長度時往往也要調整最大線程數(maxmiumSize)。
問題 | |
---|---|
很大的queue.size,很小的maxmiumSize | 一方面降低CPU了使用率和線程切換頻率,避免過度競爭,從而導致人為的降低了吞吐量(可以是優點也可以是缺點) |
很小的queue.size,很大的maxmiumSize |
實際使用的時候大部分任務都是i/o密集型的,所以其實可以併發執行比我們想的更多的任務,適用於不緊急但希望儘可能快的任務,例如定時job任務或者導出任務。這種任務的執行我們希望在不影響其他服務的情況下儘可能快的執行。
策略三
直接處理任務(Direct handoffs)
例如:synchoronousQueue。當客戶端提交任務時,在有合適的線程執行此任務才返回,否則阻塞客戶端。
使用這個隊列少了入隊和出隊操作,效率更好,適用於需要儘快響應的任務,例如h5端的介面。
這種方式通常需要 unbounded maxmiumPoolSize, 即無限制的線程數,但是如果當客戶端不停地提交任務且消費不過來地時候,會有線程數瘋狂飆升,造成系統不穩定的風險。所以實際使用中還是會限制 maxmiumSize的值,可以通過使用下文中提到的CallerRunRejectPolicy來緩慢降低客戶端提交任務的速度。從而將非同步降級為同步執行。
3 任務拒絕策略
線上程池關閉(shutdown()),或者任務隊列滿了,工作線程也滿了的時候會執行RejectedExecutionHandler.rejectedExecution(),來拒絕任務,jdk為我們提供了以下四種拒絕策略,我們也可以自己定義合適的拒絕策略.
拒絕策略 | 描述 |
---|---|
ThreadPoolExecutor.AbortPolify | 預設拒絕策略,拋出RejectedExecutionException異常來通知客戶端有任務被拒絕,客戶端經常會忽略這個異常的處理導致發生線上問題 |
ThreadPoolExecutor.DiscardPolify | 丟棄最新提交的任務。一般沒有哪個任務是可以丟棄的,不建議使用。 |
ThreadPoolExecutor.DiscardOldestPolify | 丟棄最先提交的任務。這裡註意如果使用的是優先順序隊列的話,會拋棄最高優先順序的任務,隨意得謹慎使用 |
ThreadPoolExecutor.CallerRunPolify | 由客戶端線程執行任務,即 客戶端代碼 -> submit() -> task.run() ->客戶端代碼,可以降低任務的提交速度,使得由非同步執行降級為同步執行 |
4 工作線程工廠類
線程池使用線程工廠類來生成工作線程,我們可以自定義或者使用guava提供的ThreadFactoryBuilder()來創建線程工廠類
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory(String threadPoolName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = threadPoolName + "-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
// guava
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(name + "-pool-%d").build();
JDK平臺提供的預設線程池
Executors作為Executor介面的伴生類,提