1 簡介 上一篇博客“非同步任務服務簡介”對FutureTask做過簡要介紹與分析,這裡再次對FutureTask做一次深入的分析(基於JDK1.8)。 FutureTask同時實現了Future 、Runnable介面,因此它可以交給執行器Executor去執行這個任務,也可以由調用線程直接執行ru ...
1 簡介
上一篇博客“非同步任務服務簡介”對FutureTask做過簡要介紹與分析,這裡再次對FutureTask做一次深入的分析(基於JDK1.8)。
FutureTask同時實現了Future 、Runnable介面,因此它可以交給執行器Executor去執行這個任務,也可以由調用線程直接執行run方法。
根據FutureTask.run方法的執行狀態,可將其分為以下3種狀態
①未啟動: run方法還未被執行,FutureTask處於未啟動狀態。
②已啟動: run方法在執行過程中,FutureTask處於已啟動狀態
③已完成:run方法正常完成返回或被取消或執行過程中因異常拋出而非正常結束,FutureTask處於已完成狀態。
當FutureTask處於未啟動或已啟動狀態時,執行FutureTask.get()方法將導致調用線程阻塞;當FutureTask處於已完成狀態時,執行FutureTask.get()方法將導致調用線程立即返回結果或拋出異常。
當FutureTask處於未啟動狀態時,執行FutureTask.cancel()方法將導致此任務永遠不會被執行;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務線程的方式來試圖停止任務;當FutureTask處於已啟動狀態時,執行 FutureTask.cancel(false)方法將不會對正在執行此任務的線程產生影響(讓正在執行的任務運行完成);當FutureTask處於已完成狀態時,執行FutureTask.cancel方法將返回false (已完成的任務任務無法取消)。
2 用法示例
FutureTask因其自身繼承於Runnable介面,因此它可以交給執行器Executor去執行;另外它也代表非同步任務結果,它還可以通過ExecutorService.submit返回一個FutureTask。另外FutureTask也可單獨使用。為了更好的理解FutureTask ,下麵結合ConcurrentHashMap演示一個任務緩存。緩存中有多個任務,使用多線程去執行這些任務,一個任務最多被一個線程消費,若多個線程試圖執行這一個任務,只允許一個線程來執行,其他線程必須等待它執行完成。
import java.util.concurrent.*; public class FutureTaskTest { private final ConcurrentMap<String, Future<String>> taskCache = new ConcurrentHashMap<>(); public String executionTask(final String taskName) throws ExecutionException, InterruptedException { while (true) { Future<String> future = taskCache.get(taskName);// 從緩存中獲取任務 if (future == null) {//不存在此任務,新構建一個任務放入緩存,並啟動這個任務 Callable<String> task = () ->{ System.out.println("執行的任務名是"+taskName); return taskName; } ; // 1.2創建任務 FutureTask<String> futureTask = new FutureTask<String>(task); future = taskCache.putIfAbsent(taskName, futureTask);// 嘗試將任務放入緩存中 if (future == null) { future = futureTask; futureTask.run();//執行任務 } } try { //若任務在緩存中了,可以直接等待任務的完成 return future.get();// 等待任務執行完成 } catch (CancellationException e) { taskCache.remove(taskName, future); } } } public static void main(String[] args) { final FutureTaskTest taskTest = new FutureTaskTest(); for (int i = 0; i < 7; i++) { int finalI = i; new Thread(()->{ try { taskTest.executionTask("taskName" + finalI); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { taskTest.executionTask("taskName" + finalI); taskTest.executionTask("taskName" + finalI); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } }).start(); } } }
列印輸出
3 實現原理
1) 成員變數
它有一個成員變數state表示狀態
private volatile int state;
它有這些可能取值
private static final int NEW = 0;//剛開始的狀態或任務在運行中 private static final int COMPLETING = 1;//臨時狀態,任務即將結束,正在設置結果 private static final int NORMAL = 2;//任務正常完成 private static final int EXCEPTIONAL = 3;//因拋出異常而結束任務 private static final int CANCELLED = 4;//任務被取消 private static final int INTERRUPTING = 5;//任務正在被中斷 private static final int INTERRUPTED = 6;//任務被中斷(中斷的最終狀態)
state可能有這幾種狀態轉換
/** NEW -> COMPLETING -> NORMAL 正常結束任務時的狀態轉換流程 * NEW -> COMPLETING -> EXCEPTIONAL 任務執行過程中拋出了異常時的狀態轉換流程 * NEW -> CANCELLED 任務被取消時的狀態轉換流程 * NEW -> INTERRUPTING -> INTERRUPTED 任務執行過程中出現中斷時的狀態轉換流程 */
其他成員變數
private Callable<V> callable; private Object outcome; // non-volatile, protected by state reads/writes private volatile Thread runner; private volatile WaitNode waiters;
成員變數callable表示要執行的任務,
成員變數outcome表示任務的結果或任務非正常結束的異常
成員變數runner表示執行此任務的線程
成員變數waiter表示等待任務執行結果的等待棧(數據結構是單向鏈表) 。WaitNode是一個簡單的靜態內部,一個成員變數thread表示等待結果的線程,另一個成員變數next表示下一個等待節點(線程)。
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
2) 構造方法
FutureTask的構造方法會初始化callable和state ,它有兩個構造方法, 分別接受Callable和Runnable類型的待執行任務。但對於Runnable類型參數,它會調用Executors.callable將Runnable轉換為Callable類型實例,以便於統一處理。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
Executors.callable方法也很簡單,它就返回了一個Callable的實現類RunnableAdapter類型的對象。
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
3) 主要API
(1) run與runAndReset
run方法是Funture最重要的方法,FutureTask的一切都是從run方法開始的,它是執行callable任務的方法。
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) //將當前線程設置為執行任務的線程,CAS失敗就直接返回 return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call();//執行任務 ran = true; } catch (Throwable ex) { //運行時有異常,設置異常 result = null; ran = false; setException(ex); } if (ran) set(result);//設置結果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; //state已是最終狀態,不再變化,將runer設為null,防止run方法被併發調用 // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; //清空運行線程runner後再重新獲取state,防止遺漏掉對中斷的處理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
其主要邏輯是:
①檢查狀態,設置運行任務的線程
②調用callable的call方法去執行任務,並捕獲運行中可能出現的異常
③如果任務正常完成,調用set設置任務的結果,將state設為NORMAL, 將結果保存到outcome ,喚醒所有等待結果的線程
④若執行任務過程中發生了異常,調用setException設置異常,將state設為EXCEPTIONAL ,將此異常也保存到outcome ,喚醒所有等待結果的線程
⑤最後將運行線程runner清空,若狀態可能是任務被取消的中斷還要處理此中斷。
set 、setException方法分別用來設置結果、設置異常,但這僅是它們的主要邏輯,它們還會進行其他的處理。
它們會將結果或異常設置到成員變數outcome上,還會更新狀態state,最後調用finishCompletion從等待棧表中移除並喚醒所有(節點)線程(任務已完成,無需要等待,可以直接獲取結果,等待棧已沒有存在的意義了)。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
run方法中有對中斷的處理,我們來看看handlePossibleCancellationInterrupt方法怎麼處理中斷的.
這裡就是簡單地使當前線程讓出時間片,讓其他線程先執行任務,即線程禮讓。
private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt }
runAndReset方法是FutureTask類自己添加的protected級別的方法(供子類調用), 這個方法主要用來執行可多次執行且不需要結果的任務,只有在任務運行和重設成功時才返回true 。定時任務執行器ScheduledThreadPoolExecutor的靜態內部ScheduledFutureTask的run方法調用了這個API.
和run方法相比,runAndSet方法與之邏輯大致相同,只是runAndSet沒用調用set方法設置結果(本身不需要結果,也是出於防止state被修改的目的)
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) //任務已啟動或CAS設置運行任務的的線程失敗,直接返回false return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result 沒有調用set(V)方法,不設置結束 ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; //任務成功運行且state還是NEW時返回true,反之返回false }
(2) get方法
get方法用於獲取任務的最終結果,它有兩個版本,其中一個是超時版本。兩個版本的最主要的區別在於,非超時版本可以不限時長地等待結果返回 ,另外非超時版本不會拋出TimeoutException超時異常。get方法超時版本的基本邏輯:若任務未完成就等待任務完成,最後調用report報告結果,report會根據狀態返回結果或拋出異常。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L);//awaitDone第一個參數為false,表示可以無限時長等待 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING &&//還未完成 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//等待完成 throw new TimeoutException();//到了限定時間,任務仍未完成,拋出超時異常TimeoutException return report(s);//報告結果 }
get方法的核心實現在於調用awaitDone方法,awaitDone用於等待任務的結果,若任務未完成awaitDone會阻塞當前線程。
awaitDone方法的基本邏輯:①若執行任務時出現了中斷,則拋出InterruptedException異常;②若此時任務已完成,就返回最新的state,③若任務即將完成就使當前線程讓出CPU時間片,讓其他線程先執行;④若任務還在執行中,就將當前線程加入到等待棧中,然後讓當前線程休眠直到超出限定時間或等待任務完成時run方法調用finishCompletion喚醒線程(run方法中的set或setException調用finishCompletion,而finishCompletion又會調用LockSupport.unpark).
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { //被中斷了,就在等待棧表中移除這個線程,並拋出中斷異常 removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { //任務完成了,將當前線程從等待隊列中清空,返回最新的狀態 if (q != null) q.thread = null; return s; } //任務即將完成,當前線程禮讓,讓其他線程執行 else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) //初始化當前線程對應的節點 q = new WaitNode(); else if (!queued) //如果之前入棧失敗,再次嘗試入棧(CAS更新),將當前節點設為等待棧的棧頂 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { //如果設置了超時時間 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //如果任務執行時長已經超出了給定的時間,從等待棧中移除當前節點(線程) removeWaiter(q); return state; } //讓當前線程休眠等待給定的時間(或等到run方法中的set或setException調用finishCompletion來喚醒) LockSupport.parkNanos(this, nanos); } else//未設置超時時間 //讓當前線程無限時長休眠等待,直到任務完成時run方法中的set或setException調用finishCompletion來喚醒此線程 LockSupport.park(this); } }
上面的awaitDone方法中調用removeWaiter來移除等待棧表的中斷和超時節點。
其內部實現不容易理解,但主要邏輯還是很清楚的:從頭到尾遍歷鏈表,將鏈表中的中斷/超時節點移除出鏈表,若有線程競爭就重頭開始再次遍歷鏈表檢查並移除無效節點。
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; //先將節點對應的線程清空,下麵的"q.thread != null"就能判斷節點是否超時或中斷節點。 retry: for (;;) { // restart on removeWaiter race //q表示當前遍歷到的節點,pred表示q的前驅節點,s表示q的後繼節點 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {//遍歷完鏈表才能退出內迴圈 s = q.next; //q.thread!=null 表示這不是超時或中斷的節點,它是效節點,不能被從棧表中移除 //(removeWaiter的開頭將超時或中斷的節點在thread賦空,可見node.thread=null代碼) if (q.thread != null) pred = q; //得到下次迴圈時q的前驅節點 else if (pred != null) { //q.thread== null 且pred!=null,需要將無效節點q從棧表中移除 //將q的前驅、後繼節點直接鏈接在一起,q本身被移除出棧表了 pred.next = s; //這裡是從前向後遍歷鏈表的,無競爭情況下,不可能沒檢查到當前節點的前面還有無效節點, //那麼一定有其他線程修改了當前節點q的前驅,些時有線程競爭,需要從鏈表的頭部重新遍歷檢查 if (pred.thread == null) // check for race continue retry; } // pred==null且q.thread=null //q的前驅節點為空,表明q是鏈表的頭節點 //q.thread==null,表明q是無效節點 //無效節點不能作為鏈表的頭節點,所以要更新頭節點,將q的後繼節點s作為鏈表新的頭節點 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, //CAS更新頭節點 q, s)) //CAS更新失敗,重試 continue retry; } break; } } }
get方法需要調用report方法來報告結果,而report方法的基本邏輯也簡單:若是任務正常結束就返回這個任務的結果,若是任務被取消,就拋出CancellationException異常,若是在執行任務過程中發生了異常就將其統一封裝成ExecutionException並拋出。
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
(3) cancel方法
cancel方法用於取消任務,我們可以看看cancel(boolean)方法如何實現的
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) //①不是NEW狀態,表示任務至少是COMPLETING(即將結束)狀態,返回false //②CAS更新state為INTERRUPTING或CANCELLED失敗,返回false //只有state狀態更新成功,才能取消任務(防止被併發調用) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) {//允許中斷就設置中斷標誌 try { Thread t = runner; if (t != null) t.interrupt();//設置中斷標誌 } finally { // final state 設置中斷的最終狀態 //INTERRUPTING -> INTERRUPTED ,將state由“正在中斷”更新為”已經中斷“ UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //從等待棧上喚醒並移除所有的線程(節點) finishCompletion(); } return true; }
其基本邏輯:
①任務已結束或被取消,返回false
②若mayInterruptIfRunning為true,調用interrupt設置中斷標誌,將state設置為INTERRUPTED,若mayInterruptIfRunning為false,將state設為CANCELLED.
③調用finishCompletion喚醒並移除等待棧中的所有線程
finishCompletion()主要是處理任務結束後的掃尾工作,其主要邏輯是:將等待棧waiters賦空,喚醒並移除等待棧上的所有節點(線程),最後再將任務callable賦空。
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //任務取消後,等待棧表沒有存在的意義了,將等待棧waiters賦為null, if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { //遍歷鏈表(棧),將所有節點移除,並喚醒節點對應的線程 Thread t = q.thread; if (t != null) { //將節點上的線程清空 q.thread = null; LockSupport.unpark(t);//喚醒此線程 } WaitNode next = q.next; if (next == null)//鏈表到尾了,退出遍歷 break; q.next = null; // unlink to help gc 將節點next屬性清空,方便垃圾回收 q = next;//向後移動一個節點 } break; } } done();//空方法,留給子類重寫 callable = null; //賦空,減少痕跡 // to reduce footprint }
(4) 其他輔助方法
isCancelled方法返回任務是否被取消的布爾值
isDone方法返回任務是否完成的布爾值(非正常結束也行)
isCancelled 、isDone都是直接根據state確定任務的狀態。
public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; }