通過本文可以瞭解FutureTask任務執行的方式以及Future.get已阻塞的方式獲取線程執行的結果原理,並且從代碼中可以瞭解FutureTask的任務執行狀態以及狀態的變化過程。 ...
1、FutureTask對象介紹
Future對象大家都不陌生,是JDK1.5提供的介面,是用來以阻塞的方式獲取線程非同步執行完的結果。
在Java中想要通過線程執行一個任務,離不開Runnable與Callable這兩個介面。
Runnable與Callable的區別在於,Runnable介面只有一個run方法,該方法用來執行邏輯,但是並沒有返回值;而Callable的call方法,同樣用來執行業務邏輯,但是是有一個返回值的。
Callable執行任務過程中可以通過FutureTask獲得任務的執行狀態,並且可以在執行完成後通過Future.get()方式獲取執行結果。
Future是一個介面,而FutureTask就是Future的實現類。並且FutureTask實現了 RunnableFuture(Runnable + Future),說明我們可以創建一個FutureTask並直接把它放到線程池執行,然後獲取FutureTask的執行結果。
2、FutureTask源碼解析
2.1 主要方法和屬性
那麼FutureTask是如何通過阻塞的方式來獲取到非同步線程執行的結果的呢?我們看下FutureTask中的屬性。
// FutureTask的狀態及其常量
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// callable對象,執行完後置空
private Callable<V> callable;
// 要返回的結果或要引發的異常來自 get() 方法
private Object outcome; // non-volatile, protected by state reads/writes
// 執行Callable的線程
private volatile Thread runner;
// 等待線程的一個鏈表結構
private volatile WaitNode waiters;
FutureTask中幾個比較重要的方法。
// 取消任務的執行
boolean cancel(boolean mayInterruptIfRunning);
// 返回任務是否已經被取消
boolean isCancelled();
// 返回任務是否已經完成,任務狀態不為NEW即為完成
boolean isDone();
// 通過get方法獲取任務的執行結果
V get() throws InterruptedException, ExecutionException;
// 通過get方法獲取任務的執行結果,帶有超時,如果超過給定時間則拋出異常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
2.2 FutureTask執行
當我們線上程池中執行一個Callable方法時,其實是將Callable任務封裝成一個RunnableFuture對象去執行,同時將這個RunnableFuture對象返回,這樣我們就拿到了FutureTask的引用,可以隨時獲取到任務執行的狀態,並且可以在任務執行完成後通過該對象獲取執行結果。
以下為ThreadPoolExecutor線程池提交一個callable方法的源碼。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
2.3 run方法介紹
RunnableFuture其實也是一個可以執行的runnable,我們看下他的run方法。其主要流程就是執行call方法,正常執行完畢後將result結果賦值到outcome屬性上。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 將callable賦值到本地變數
Callable<V> c = callable;
// 判斷callable不為空並且FutureTask的狀態必須為新創建
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執行call方法(用戶自己實現的call邏輯),並獲取到result結果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 如果執行過程出現異常,則將異常對象賦值到outcome上
setException(ex);
}
// 如果正常執行完畢,則將result賦值到outcome屬性上
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
以下邏輯為正常執行完成後賦值的邏輯。
// 如果任務沒有被取消,將future執行完的返回值賦值給result結果
// FutureTask任務的執行狀態是通過CAS的方式進行賦值的,並且由此可知,COMPLETING其實是一個瞬時狀態
// 當將線程執行結果賦值給outcome後,狀態會修改為對應的NORMAL,即正常結束
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
以下為執行異常時賦值邏輯,直接將Throwable對象賦值到outcome屬性上。
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
無論是正常執行還是異常執行,最終都會調用一個finishCompletion方法,用來做工作的收尾工作。
2.4 get方法介紹
Future的get方法有兩個重載的方法,一個是get()獲取結果,一個是get(long, TimeUnit)帶有超時時間的獲取結果,我們看下FutureTask中的這兩個方法是如何實現的。
// 不帶有超時時間,一直阻塞直到獲取結果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 等待結果完成,帶有超時的get方法也是調用的awaitDone方法
s = awaitDone(false, 0L);
// 返回結果
return report(s);
}
// 帶有超時時間的獲取結果,如果超過時間還沒有獲取到結果則拋出異常
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 如果任務未中斷,調用awaitDone方法等待任務結果
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 返回結果
return report(s);
}
我們主要看下awaitDone方法的執行邏輯。此方法會通過for迴圈的方式一直阻塞等待任務執行完成。如果帶有超時時間,則超過截止時間後會直接返回。
// timed:是否需要超時獲取
// nanos:超時時間單位納秒
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 此方法會一直for迴圈判斷任務狀態是否已經完成,是Future.get阻塞的原因
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 任務狀態大於COMPLETING,則表明任務結束,直接返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
// Thread.yield() 方法,使當前線程由執行狀態,變成為就緒狀態,讓出cpu時間,在下一個線程執行時候,此線程有可能被執行,也有可能沒有被執行。
// COMPLETING狀態為瞬時狀態,任務執行完成,要麼是正常結束,要麼異常結束,後續會被置為NORMAL或者EXCEPTIONAL
Thread.yield();
else if (q == null)
// 每調用一次get方法,都會創建一個WaitNode等待節點
q = new WaitNode();
else if (!queued)
// 將該等待節點添加到鏈表結構waiters中,q.next = waiters 即在waiters的頭部插入
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果方法帶有超時判斷,則判斷當前時間是否已經超過了截止時間,如果超過了及截止日期,則退出迴圈直接返回當前狀態,此時任務狀態一定是NEW
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
我們在看下report方法,在調用get方法時是如何返回結果的。
這裡首先獲取outcome的值,並判斷任務是否已經執行完成,如果執行完成,則將outcome對象強轉成泛型指定的類型;如果任務被取消了,則拋出一個CancellationException異常;如果都不是,則說明任務在執行過程中發生了異常,此時任務狀態位EXCEPTIONAL,此時的outcome即為Throwable對象,所以將outcome強轉為Throwable並拋出異常。
由此可以知道,我們將一個FutureTask任務submit到線程池中執行的時候,如果發生了異常,是會在調用get方法的時候拋出的。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
2.5 cancel方法介紹
cancel方法用於取消正在運行的任務,如果任務取消成功,則返回TRUE,如果取消失敗則返回FALSE。
// mayInterruptIfRunning:允許中斷正在運行的任務
public boolean cancel(boolean mayInterruptIfRunning) {
// mayInterruptIfRunning如果為true則將狀態置為INTERRUPTING,如果未false則將狀態置為CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
// 如果狀態修改成功後,判斷是否允許中斷線程,如果允許,則調用Thread的interrupt方法中斷
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 取消後的收尾工作
finishCompletion();
}
return true;
}
2.6 isDone/isCancelled方法介紹
isDone方法用於判斷FutureTask是否已經完成;isCancelled方法用來判斷FutureTask是否已經取消,這兩個方法都是通過狀態位來判斷的。
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
2.7 finishCompletion方法介紹
我們看下finishCompletion方法都做了哪些工作。
// 刪除所有等待線程併發出信號,最後執行done方法
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
我們看到done方法是一個受保護的空方法,此處沒有任何邏輯,由其子類去根據自己的業務去實現相應的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protected void done() { }
3、總結
通過源碼解讀可以瞭解到Future的原理:
第一步:主線程將任務封裝成一個Callable對象,通過submit方法提交到線程池去執行。
第二步:線程池執行任務的run方法,主線程則可以繼續執行其他邏輯。
第三步:線程池中方法執行完成後將結果賦值到outcome屬性上,並修改任務狀態。
第四步:主線程在需要拿到非同步任務結果的時候,主動調用fugure.get()方法來獲取結果。
第五步:如果非同步線程在執行過程中發生異常,則會在調用future.get()方法的時候拋出來。
以上就是對於FutureTask的分析,我們可以瞭解FutureTask任務執行的方式以及Future.get已阻塞的方式獲取線程執行的結果原理,並且從代碼中可以瞭解FutureTask的任務執行狀態以及狀態的變化過程。
作者:京東物流 丁冬
來源:京東雲開發者社區 自猿其說Tech