Future模式 【1】Future模式是多線程開發中常見的設計模式,它的核心思想是非同步調用。對於Future模式來說,它無法立即返回你需要的數據,但是它會返回一個契約,將來你可以憑藉這個契約去獲取你需要的信息。 【2】通俗一點就是生產者-消費者模型的擴展。經典“生產者-消費者”模型中消息的生產者不 ...
Future模式
【1】Future模式是多線程開發中常見的設計模式,它的核心思想是非同步調用。對於Future模式來說,它無法立即返回你需要的數據,但是它會返回一個契約,將來你可以憑藉這個契約去獲取你需要的信息。
【2】通俗一點就是生產者-消費者模型的擴展。經典“生產者-消費者”模型中消息的生產者不關心消費者何時處理完該條消息,也不關心處理結果。Future模式則可以讓消息的生產者等待直到消息處理結束,如果需要的話還可以取得處理結果。
java中是如何實現Future模式
【1】直接繼承Thread或者實現Runnable介面都可以創建線程,但是這兩種方法都有一個問題 就是:沒有返回值,也就是不能獲取執行完的結果。
【2】因此java1.5就提供了Callable介面來實現這一場景,而Future和FutureTask就可以和Callable介面配合起來使用。【從而達到Future模式的效果】
Callable和Runnable的區別
【1】源碼展示
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
【2】分析說明
Runnable 的缺陷:1.不能返回一個返回值 2.不能拋出 checked Exception。
Callable的call方法可以有返回值,可以聲明拋出異常。
【3】疑問解析
1)為什麼需要 Callable?
Callable 配合 Future 類 可以瞭解任務執行情況,或者取消任務的執行,還可獲取任務執行的結果,這些功能都是 Runnable 做不到的,因為它沒有返回值,不能拋出異常。
瞭解Future介面
【1】介紹 :Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。 必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。
【2】源碼展示
public interface Future<V> { // 取消任務的執行。參數指定是否立即中斷任務執行,或者等等任務結束 boolean cancel(boolean mayInterruptIfRunning); //任務是否已經取消,任務正常完成前將其取消,則返回true boolean isCancelled(); //需要註意的是如果任務正常終止、異常或取消,都將返回true boolean isDone(); //取得返回對象 V get() throws InterruptedException, ExecutionException; //取得返回對像,允許等待設置的時間範圍 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
瞭解FutureTask類(Future介面的實現類)
【1】介紹說明
1)該對象相當於是消費者和生產者的橋梁,消費者通過FutureTask存儲任務的處理結果,更新任務的狀態:未開始、正在處理、已完成等。而生產者拿到的 FutureTask被轉型為Future介面,可以阻塞式獲取任務的處理結果,非阻塞式獲取任務處理狀態。
2)FutureTask既可以被當做Runnable來執行,也可以被當做Future來獲取Callable的返回結果。
【2】代碼展示
0)繼承關係
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V>
1)屬性值
// 表示當前任務的狀態 private volatile int state; // 表示當前任務的狀態是新創建的,尚未執行 private static final int NEW = 0; // 表示當前任務即將結束,還未完全結束,值還未寫,一種臨界狀態 private static final int COMPLETING = 1; // 表示當前任務正常結束 private static final int NORMAL = 2; // 表示當前任務執行過程中出現了異常,內部封裝的callable.call()向上拋出異常了 private static final int EXCEPTIONAL = 3; // 表示當前任務被取消 private static final int CANCELLED = 4; // 表示當前任務中斷中 private static final int INTERRUPTING = 5; // 表示當前任務已中斷 private static final int INTERRUPTED = 6; // 我們在使用FutureTask對象的時候,會傳入一個Callable實現類或Runnable實現類,這個callable存儲的就是 // 傳入的Callable實現類或Runnable實現類(Runnable會被使用修飾者設計模式偽裝為) private Callable<V> callable; // 正常情況下,outcome保存的是任務的返回結果 // 不正常情況下,outcome保存的是任務拋出的異常 private Object outcome; // 保存的是當前任務執行期間,執行任務的線程的引用 private volatile Thread runner; // 因為會有很多線程去get結果,這裡把線程封裝成WaitNode,一種數據結構:棧,頭插頭取 private volatile WaitNode waiters; static final class WaitNode { // 線程對象 volatile Thread thread; // 下一個WaitNode結點 volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
2)構造方法
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { //封裝成callable,但返回值為傳入的值 this.callable = Executors.callable(runnable, result); this.state = NEW; }
3)核心方法
1.run()方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // 只有當任務狀態為new並且runner舊值為null才會執行到這裡 try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 調用callable.run()並返回結果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) // 內部設置outcome為callable執行的結果,並且更新任務的狀態為NORMAL(任務正常執行)並且喚醒阻塞的線程 set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) // 如果當前任務處於中斷中,則執行這個方法線程會不斷讓出cpu直到任務處於已中斷狀態 handlePossibleCancellationInterrupt(s); } }
2.set(V v)方法
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 設置outcome(結果)為callable.run()返回的結果 outcome = v; //修改狀態 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 喚醒調用get()的所有等待的線程並清空棧 finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 設置outcome(結果)為callable.run()拋出的異常 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
3.get()方法
public V get() throws InterruptedException, ExecutionException { int s = state; // 條件成立會調用awaitDone方法自旋等待直到任務完成 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } //這個方法是真正用來獲取任務的返回結果的,這個方法在get()方法裡面會被調用,如果該方法被調用,說明任務已經執行完了。 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
4.awaitDone(boolean timed, long nanos)方法
// 這個方法的作用是等待任務被完成(正常完成或出現異常完成都算完成),被中斷,或是被超時 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 如果當前線程出現中斷異常,則將該線程代表的WaitNode結點移出棧並拋出中斷異常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果當前任務狀態大於COMPLETING,說明當前任務已經有結果了(任務完成、中斷、取消),直接返回任務狀態 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 當前任務處於臨界狀態,即將完成,則當前線程釋放cpu else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 第一次自旋,如果當前WitNode為null,new一個WaitNode結點 else if (q == null) q = new WaitNode(); // 第二次自旋,如果當前WaitNode節點沒有入隊,則嘗試入隊 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q); // 第三次自旋,到這裡表示是否定義了超時時間 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 未超出時間,掛起當前線程一定時間 LockSupport.parkNanos(this, nanos); } else // 掛起當前線程,該線程會休眠(什麼時候該線程會繼續執行呢?除非有其他線程調用unpark()或者中斷該線程) LockSupport.park(this); } }
5.finishCompletion()方法
//任務執行完成(正常結束和非正常結束都代表任務執行完成)會調用這個方法來喚醒所有因調用get()方法而陷入阻塞的線程。 private void finishCompletion() { // 如果條件成立,說明當前有陷入阻塞的線程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; // 執行到這裡說明還有因調用get()而陷入阻塞的線程,自旋接著喚醒 // 這裡q.next設置為null幫助GC(垃圾回收) q.next = null; // unlink to help gc q = next; } break; } } //拓展方法 done(); // 將callable設置為null,方便GC callable = null; }
【3】註意事項
1)當 for 迴圈批量獲取 Future 的結果時容易 block,get 方法調用時應使用 timeout限制
2)Future 的生命周期不能後退。一旦完成了任務,它就永久停在了“已完成”的狀態,不能從頭再來
3)FutureTask 一般是結合線程池使用,然後額外採用FutureTask獲取結果。
【4】Future的局限性
從本質上說,Future表示一個非同步計算的結果。它提供了isDone()來檢測計算是否已經完成,並且在計算結束後,可以通過get()方法來獲取計算結果。在非同步計算中,Future確實是個非常優秀的介面。但是,它的本身也確實存在著許多限制:
1)併發執行多任務:Future只提供了get()方法來獲取結果,並且是阻塞的。所以,除了等待你別無他法;
2)無法對多個任務進行鏈式調用:如果你希望在計算任務完成後執行特定動作,比如發郵件,但Future卻沒有提供這樣的能力;
3)無法組合多個任務:如果你運行了10個任務,並期望在它們全部執行結束後執行特定動作,那麼在Future中這是無能為力的;
4)沒有異常處理:Future介面中沒有關於異常處理的方法;
瞭解CompletionService介面
【1】介紹
1)CompletionService 介面是一個獨立的介面,並沒有擴展 ExecutorService 。 其預設實現類是ExecutorCompletionService;
2)介面CompletionService 的功能是:以非同步的方式一邊執行未完成的任務,一邊記錄、處理已完成任務的結果。讓兩件事分開執行,任務之間不會互相阻塞,可以實現先執行完的先取結果,不再依賴任務順序了。
3)簡單來說,CompletionService 就是監視著 Executor線程池執行的任務,用 BlockingQueue 將完成的任務的結果存儲下來。(當然,這個也可以是程式員自己去實現,但是要不斷遍歷與每個任務關聯的 Future,然後不斷去輪詢,判斷任務是否已經完成,比較繁瑣);
【2】源碼展示
public interface CompletionService<V> { //提交一個 Callable 任務;一旦完成,便可以由take()、poll()方法獲取 Future<V> submit(Callable<V> task); //提交一個 Runnable 任務,並指定計算結果; Future<V> submit(Runnable task, V result); //獲取並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。 Future<V> take() throws InterruptedException; //獲取並移除表示下一個已完成任務的 Future,如果不存在這樣的任務,則返回 null。 Future<V> poll(); //獲取並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則將等待指定的時間(如果有必要) Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
瞭解ExecutorCompletionService類(CompletionService介面的實現類)
【1】介紹
1)內部通過阻塞隊列+FutureTask,實現了任務先完成可優先獲取到,即結果按照完成先後順序排序,內部有一個先進先出的阻塞隊列,用於保存已經執行完成的Future,通過調用它的take方法或poll方法可以獲取到一個已經執行完成的Future,進而通過調用Future介面實現類的get方法獲取最終的結果。
【2】源碼分析
1)屬性分析
//線程池 private final Executor executor; //判斷線程池是否繼承抽象類 private final AbstractExecutorService aes; //阻塞隊列 private final BlockingQueue<Future<V>> completionQueue;
2)構造方法
//對於線程池必須定義,而阻塞隊列會有預設的 //而預設的LinkedBlockingQueue對於併發編程來說是存在隱患的(依據阿裡手冊來說,因為隊列的無盡性會導致OOM) //所以一般考慮要你自己去定義阻塞隊列 public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; //如果是繼承了抽象類的實現 this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
3)阻塞隊列元素的定義
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } //FutureTask裡面的拓展方法,在run的時候會被調用,所以是做完任務了會自動提交到隊列裡面 protected void done() { completionQueue.add(task); } private final Future<V> task; }
4)實現介面的方法
//採用newTaskFor來封裝非標準的取消 //因為傳入的Callable或Runnable,這種不是FutureTask,故需要封裝 private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); } //下麵是對介面定義的方法的實現 public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
【3】彙總說明
1)說白了就是基於FutureTask 是單線程的任務,考慮可以等待獲取返回結果,那麼應該可以採用線程池的方法形成多任務併發的結果。
2)故定義了CompletionService介面作為規範,ExecutorCompletionService類作為具體的實現類【作為管理者】,不然每次採用線程池來做的話都要自己定義去管理。
3)當需要批量提交非同步任務的時候建議你使用CompletionService。CompletionService將線程池Executor和阻塞隊列BlockingQueue的功能融合在了一起,能夠讓批量非同步任務的管理更簡單。
4)CompletionService能夠讓非同步任務的執行結果有序化。先執行完的先進入阻塞隊列,利用這個特性,你可以輕鬆實現後續處理的有序性,避免無謂的等待,同時還可以快速實現諸如Forking Cluster這樣的需求。
5)線程池隔離。CompletionService支持自己創建線程池,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險。
【4】示例展示
1)示例代碼
public class CompletionServiceDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { //創建線程池 ExecutorService executor = Executors.newFixedThreadPool(10); //創建CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); //非同步向電商S1詢價 cs.submit(() -> getPriceByS1()); //非同步向電商S2詢價 cs.submit(() -> getPriceByS2()); //非同步向電商S3詢價 cs.submit(() -> getPriceByS3()); //將詢價結果非同步保存到資料庫 for (int i = 0; i < 3; i++) { //從阻塞隊列獲取futureTask Integer r = cs.take().get(); executor.execute(() -> save(r)); } executor.shutdown(); } private static void save(Integer r) { System.out.println("保存詢價結果:{}"+r); } private static Integer getPriceByS1() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(5000); System.out.println("電商S1詢價信息1200"); return 1200; } private static Integer getPriceByS2() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(8000); System.out.println("電商S2詢價信息1000"); return 1000; } private static Integer getPriceByS3() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(3000); System.out.println("電商S3詢價信息800"); return 800; } }
瞭解CompletableFuture
【1】介紹
1)簡單的任務,用Future獲取結果還好,但我們並行提交的多個非同步任務,往往並不是獨立的,很多時候業務邏輯處理存在串列[依賴]、並行、聚合的關係。如果要我們手動用 Fueture 實現,是非常麻煩的。
2)CompletableFuture是Future介面的擴展和增強。CompletableFuture實現了Future介面,併在此基礎上進行了豐富地擴展,完美地彌補了Future上述的種種問題。更為重要的是,CompletableFuture實現了對任務的編排能力。藉助這項能力,我們可以輕鬆地組織不同任務的運行順序、規則以及方式。從某種程度上說,這項能力是它的核心能力。而在以往,雖然通過CountDownLatch等工具類也可以實現任務的編排,但需要複雜的邏輯處理,不僅耗費精力且難以維護。
3)CompletableFuture除了實現Future介面還實現了CompletionStage介面。
4)CompletionStage介面: 執行某一個階段,可向下執行後續階段。非同步執行,預設線程池是ForkJoinPool.commonPool()。
【2】常用方法
1)描述依賴關係:
1.thenApply() 把前面非同步任務的結果,交給後面的Function
2.thenCompose()用來連接兩個有依賴關係的任務,結果由第二個任務返回
2)描述and聚合關係:
1.thenCombine:任務合併,有返回值
2.thenAccepetBoth:兩個任務執行完成後,將結果交給thenAccepetBoth消耗,無返回值。
3.runAfterBoth:兩個任務都執行完成後,執行下一步操作(Runnable)。
3)描述or聚合關係:
1.applyToEither:兩個任務誰執行的快,就使用那一個結果,有返回值。
2.acceptEither: 兩個任務誰執行的快,就消耗那一個結果,無返回值。
3.runAfterEither: 任意一個任務執行完成,進行下一步操作(Runnable)。
4)並行執行:
1.CompletableFuture類自己也提供了anyOf()和allOf()用於支持多個CompletableFuture並行執行
【3】創建非同步操作
1)CompletableFuture 提供了四個靜態方法來創建一個非同步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
2)這四個方法區別在於:
1.runAsync 方法以Runnable函數式介面類型為參數,沒有返回結果,supplyAsync 方法Supplier函數式介面類型為參數,返回結果類型為U;Supplier 介面的 get() 方法是有返回值的(會阻塞)。
2.沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執行非同步代碼。如果指定線程池,則使用指定的線程池運行。
3.預設情況下 CompletableFuture 會使用公共的 ForkJoinPool 線程池,這個線程池預設創建的線程數是 CPU 的核數(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數)。如果所有 CompletableFuture 共用一個線程池,那麼一旦有任務執行一些很慢的 I/O 操作,就會導致線程池中所有線程都阻塞在 I/O 操作上,從而造成線程饑餓,進而影響整個系統的性能。所以,強烈建議你要根據不同的業務類型創建不同的線程池,以避免互相干擾。
3)supplyAsync的兩種獲取結果的方法join&get
1.join()和get()方法都是用來獲取CompletableFuture非同步之後的返回值。join()方法拋出的是uncheck異常(即未經檢查的異常),不會強制開發者拋出。get()方法拋出的是經過檢查的異常,ExecutionException, InterruptedException 需要用戶手動處理(拋出或者 try catch)
【3】常用方法的使用與介紹
1)結果處理
1.介紹:
//當CompletableFuture的計算結果完成,或者拋出異常的時候,我們可以執行特定的 Action。主要是下麵的方法: public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) //Action的類型是BiConsumer<? super T,? super Throwable>,它可以處理正常的計算結果,或者異常情況。 //方法不以Async結尾,意味著Action使用相同的線程執行,而Async可能會使用其它的線程去執行(如果使用相同的線程池,也可能會被同一個線程選中執行)。 //這幾個方法都會返回CompletableFuture,當Action執行完畢後它的結果返回原始的CompletableFuture的計算結果或者返回異常
2.示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } if (new Random().nextInt(10) % 2 == 0) { int i = 12 / 0; } System.out.println("執行結束!"); return "test"; }); //whenComplete一般搭配exceptionally一起使用,一個處理結果,一個處理異常 future.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable action) { System.out.println(t+" 執行完成!"); } }); future.exceptionally(new Function<Throwable, String>() { @Override public String apply(Throwable t) { System.out.println("執行失敗:" + t.getMessage()); return "異常xxxx"; } });
2)結果轉換
1.介紹:所謂結果轉換,就是將上一段任務的執行結果作為下一階段任務的入參參與重新計算,產生新的結果。
2.方法列舉:
【1】thenApply
1.說明
//thenApply 接收一個函數作為參數,使用該函數處理上一個CompletableFuture 調用的結果,並返回一個具有處理