本文將主要講解 J.U.C 中的 Future 框架,並分析結合源碼分析其內部結構邏輯; 一、Future 框架概述 JDK 中的 Future 框架實際就是 Future 模式的實現,通常情況下我們會配合線程池使用,但也可以單獨使用;下麵我們就單獨使用簡單舉例; 1. 應用實例 列印: 如上面代碼 ...
本文將主要講解 J.U.C 中的 Future 框架,並分析結合源碼分析其內部結構邏輯;
一、Future 框架概述
JDK 中的 Future 框架實際就是 Future 模式的實現,通常情況下我們會配合線程池使用,但也可以單獨使用;下麵我們就單獨使用簡單舉例;
1. 應用實例
FutureTask<String> future = new FutureTask<>(() -> {
log.info("非同步任務執行...");
Thread.sleep(2000);
log.info("過了很久很久...");
return "非同步任務完成";
});
log.info("啟動非同步任務...");
new Thread(future).start();
log.info("繼續其他任務...");
Thread.sleep(1000);
log.info("獲取非同步任務結果:{}", future.get());
列印:
[15:38:03,231 INFO ] [main] - 啟動非同步任務...
[15:38:03,231 INFO ] [main] - 繼續其他任務...
[15:38:03,231 INFO ] [Thread-0] - 非同步任務執行...
[15:38:05,232 INFO ] [Thread-0] - 過了很久很久...
[15:38:05,236 INFO ] [main] - 獲取非同步任務結果:非同步任務完成
如上面代碼所示,首先我們將要執行的任務包裝成 Callable
,這裡如果不需要返回值也可以使用 Runnable
;然後構建 FutureTask
由一個線程啟動,最後使用 Future.get()
獲取非同步任務結果;
2. Future 運行邏輯
對於 Future 模式的流程圖如下:
對比上面的實例代碼,大家可能會發現有些不一樣,因為在 FutureTask 同時繼承了 Runnable 和 Future 介面,所以再提交任務後沒有返回Future,而是直接使用自身調用 get;下麵我們就對源碼進行實際分析;
二、源碼分析
1. FutureTask 主體結構
public interface RunnableFuture<V> extends Runnable, Future<V> {}
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state; // 任務運行狀態
private Callable<V> callable; // 非同步任務
private Object outcome; // 返回結果
private volatile Thread runner; // 非同步任務執行線程
private volatile WaitNode waiters; // 等待非同步結果的線程棧(通過Treiber stack演算法實現)
public FutureTask(Callable<V> callable) { // 需要返回值
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
...
}
另外在代碼中還可以看見有很多地方都是用了 CAS
來更新變數,而 JDK1.6 中甚至使用了 AQS
來實現;其原因就是同一個 FutureTask
可以多個線程同時提交,也可以多個線程同時獲取; 所以代碼中有很多的狀態變數:
// FutureTask.state 取值
private static final int NEW = 0; // 初始化到結果返回前
private static final int COMPLETING = 1; // 結果賦值
private static final int NORMAL = 2; // 執行完畢
private static final int EXCEPTIONAL = 3; // 執行異常
private static final int CANCELLED = 4; // 任務取消
private static final int INTERRUPTING = 5; // 設置中斷狀態
private static final int INTERRUPTED = 6; // 任務中斷
同時源碼的註釋中也詳細給出了可能出現的狀態轉換:
- NEW -> COMPLETING -> NORMAL // 任務正常執行
- NEW -> COMPLETING -> EXCEPTION // 任務執行異常
- NEW ->CANCELLED // 任務取消
- NEW -> INITERRUPTING -> INTERRUPTED // 任務中斷
註意這裡的 COMPLETING
狀態是一個很微妙的狀態,正因為有他的存在才能實現無鎖賦值;大家先留意這個狀態,然後在代碼中應該能體會到;另外這裡還有一個變數需要註意,WaitNode
;使用 Treiber stack 演算法實現的無鎖棧;其原理說明可以參考下麵第三節;
2. 任務執行
public void run() {
if (state != NEW || // 確保任務執行完成後,不再重覆執行
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread())) // 確保只有一個線程執行
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 設置異常結果
}
if (ran) set(result); // 設置結果
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); // 確保中斷狀態已經設置
}
}
// 設置非同步任務結果
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保證結果只能設置一次
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion(); // 喚醒等待線程
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保證結果只能設置一次
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
3. 任務取消
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && // 只有在任務執行階段才能取消
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, // 設置取消狀態
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
註意 cancel(false)
也就是僅取消,並沒有打斷;非同步任務會繼續執行,只是這裡首先設置了 FutureTask.state = CANCELLED
,所以最後在設置結果的時候會失敗,UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
;
4. 獲取結果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); // 阻塞等待
return report(s);
}
private V report(int s) throws ExecutionException { // 根據最後的狀態返回結果
Object x = outcome;
if (s == NORMAL) return (V)x;
if (s >= CANCELLED) throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q); // 移除等待節點
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { // 任務已完成
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 正在賦值,直接先出讓線程
Thread.yield();
else if (q == null) // 任務還未完成需要等待
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q); // 使用 Treiber stack 演算法
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
三、Treiber stack
在《Java 併發編程實戰》中講了, 創建非阻塞演算法的關鍵在於,找出如何將原子修改的範圍縮小到單個變數上,同時還要維護數據的一致性 。
@ThreadSafe public class ConcurrentStack <E> {
AtomicReference<Node<E>> top = new AtomicReference<>();
private static class Node <E> {
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
}
總結
- 總體來講源碼比較簡單,因為其本身只是一個 Future 模式的實現
- 但是其中的狀態量的設置,還有裡面很多無鎖的處理方式,才是 FutureTask 帶給我們的精華!