【併發編程】Future模式添加Callback及Promise 模式 ...
Future
Future是Java5增加的類,它用來描述一個非同步計算的結果。你可以使用 isDone
方法檢查計算是否完成,或者使用 get
方法阻塞住調用線程,直到計算完成返回結果。你也可以使用 cancel
方法停止任務的執行。下麵來一個慄子:
public class FutureDemo { public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(10); Future<Integer> f = es.submit(() ->{ Thread.sleep(10000); // 結果 return 100; }); // do something Integer result = f.get(); System.out.println(result); // while (f.isDone()) { // System.out.println(result); // } } }
在這個例子中,我們往線程池中提交了一個任務並立即返回了一個Future對象,接著可以做一些其他操作,最後利用它的 get
方法阻塞等待結果或 isDone
方法輪詢等待結果(關於Future的原理可以參考之前的文章:【併發編程】Future模式及JDK中的實現)
雖然這些方法提供了非同步執行任務的能力,但是對於結果的獲取卻還是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。
阻塞的方式顯然和我們的非同步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時的得到計算結果,為什麼不能用觀察者設計模式當計算結果完成及時通知監聽者呢?
很多語言,比如Node.js,採用Callback的方式實現非同步編程。Java的一些框架,比如Netty,自己擴展了Java的 Future
介面,提供了 addListener
等多個擴展方法。Google的guava也提供了通用的擴展Future:ListenableFuture
、 SettableFuture
以及輔助類 Futures
等,方便非同步編程。為此,Java終於在JDK1.8這個版本中增加了一個能力更強的Future類:CompletableFuture
。它提供了非常強大的Future的擴展功能,可以幫助我們簡化非同步編程的複雜性,提供了函數式編程的能力,可以通過回調的方式處理計算結果。下麵來看看這幾種方式。
Netty-Future
引入Maven依賴:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.29.Final</version> </dependency>
public class NettyFutureDemo { public static void main(String[] args) throws InterruptedException { EventExecutorGroup group = new DefaultEventExecutorGroup(4); System.out.println("開始:" + DateUtils.getNow()); Future<Integer> f = group.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("開始耗時計算:" + DateUtils.getNow()); Thread.sleep(10000); System.out.println("結束耗時計算:" + DateUtils.getNow()); return 100; } }); f.addListener(new FutureListener<Object>() { @Override public void operationComplete(Future<Object> objectFuture) throws Exception { System.out.println("計算結果:" + objectFuture.get()); } }); System.out.println("結束:" + DateUtils.getNow()); // 不讓守護線程退出 new CountDownLatch(1).await(); } }
輸出結果:
開始:2019-05-16 08:25:40:779 結束:2019-05-16 08:25:40:788 開始耗時計算:2019-05-16 08:25:40:788 結束耗時計算:2019-05-16 08:25:50:789 計算結果:100
從結果可以看出,耗時計算結束後自動觸發Listener的完成方法,避免了主線程無謂的阻塞等待,那麼它究竟是怎麼做到的呢?下麵看源碼
DefaultEventExecutorGroup
實現了 EventExecutorGroup
介面,而 EventExecutorGroup
則是實現了JDK ScheduledExecutorService
介面的線程組介面,所以它擁有線程池的所有方法。然而它卻把所有返回 java.util.concurrent.Future
的方法重寫為返回 io.netty.util.concurrent.Future
,把所有返回 java.util.concurrent.ScheduledFuture
的方法重寫為返回 io.netty.util.concurrent.ScheduledFuture
。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> { /** * 返回一個EventExecutor */ EventExecutor next(); Iterator<EventExecutor> iterator(); Future<?> submit(Runnable task); <T> Future<T> submit(Runnable task, T result); <T> Future<T> submit(Callable<T> task); ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
EventExecutorGroup
的submit方法因為 newTaskFor
的重寫導致返回了netty的 Future
實現類,而這個實現類正是 PromiseTask
。
@Override public <T> Future<T> submit(Callable<T> task) { return (Future<T>) super.submit(task); } @Override protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new PromiseTask<T>(this, callable); }
PromiseTask
的實現很簡單,它緩存了要執行的 Callable
任務,併在run方法中完成了任務調用和Listener的通知。
@Override public void run() { try { if (setUncancellableInternal()) { V result = task.call(); setSuccessInternal(result); } } catch (Throwable e) { setFailureInternal(e); } } @Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } @Override public Promise<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this, cause); }
任務調用成功或者失敗都會調用 notifyListeners
來通知Listener,所以大家得在回調的函數里調用 isSuccess
方法來檢查狀態。
這裡有一個疑惑,會不會 Future
在調用 addListener
方法的時候任務已經執行完成了,這樣子會不會通知就會失敗了啊?
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }
可以發現,在Listener添加成功之後,會立即檢查狀態,如果任務已經完成立刻進行回調,所以這裡不用擔心啦。OK,下麵看看Guava-Future的實現。
Guava-Future
首先引入guava的Maven依賴:
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>22.0</version> </dependency>
public class GuavaFutureDemo { public static void main(String[] args) throws InterruptedException { System.out.println("開始:" + DateUtils.getNow()); ExecutorService executorService = Executors.newFixedThreadPool(10); ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService); ListenableFuture<Integer> future = service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("開始耗時計算:" + DateUtils.getNow()); Thread.sleep(10000); System.out.println("結束耗時計算:" + DateUtils.getNow()); return 100; } }); future.addListener(new Runnable() { @Override public void run() { System.out.println("調用成功"); } }, executorService); System.out.println("結束:" + DateUtils.getNow()); new CountDownLatch(1).await(); } }
ListenableFuture
可以通過 addListener
方法增加回調函數,一般用於不在乎執行結果的地方。如果需要在執行成功時獲取結果或者執行失敗時獲取異常信息,需要用到 Futures
工具類的 addCallback
方法:
Futures.addCallback(future, new FutureCallback<Integer>() { @Override public void onSuccess(@Nullable Integer result) { System.out.println("成功,計算結果:" + result); } @Override public void onFailure(Throwable t) { System.out.println("失敗"); } }, executorService);
前面提到除了 ListenableFuture
外,還有一個 SettableFuture
類也支持回調能力。它實現自 ListenableFuture
,所以擁有 ListenableFuture
的所有能力。
public class GuavaFutureDemo { public static void main(String[] args) throws InterruptedException { System.out.println("開始:" + DateUtils.getNow()); ExecutorService executorService = Executors.newFixedThreadPool(10); ListenableFuture<Integer> future = submit(executorService); Futures.addCallback(future, new FutureCallback<Integer>() { @Override public void onSuccess(@Nullable Integer result) { System.out.println("成功,計算結果:" + result); } @Override public void onFailure(Throwable t) { System.out.println("失敗:" + t.getMessage()); } }, executorService); Thread.sleep(1000); System.out.println("結束:" + DateUtils.getNow()); new CountDownLatch(1).await(); } private static ListenableFuture<Integer> submit(Executor executor) { SettableFuture<Integer> future = SettableFuture.create(); executor.execute(new Runnable() { @Override public void run() { System.out.println("開始耗時計算:" + DateUtils.getNow()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("結束耗時計算:" + DateUtils.getNow()); // 返回值 future.set(100); // 設置異常信息 // future.setException(new RuntimeException("custom error!")); } }); return future; } }
看起來用法上沒有太多差別,但是有一個很容易被忽略的重要問題。當 SettableFuture
的這種方式最後調用了 cancel
方法後,線程池中的任務還是會繼續執行,而通過 submit
方法返回的 ListenableFuture
方法則會立即取消執行,這點尤其要註意。下麵看看源碼:
和Netty的Future一樣,Guava也是通過實現了自定義的 ExecutorService
實現類 ListeningExecutorService
來重寫了 submit
方法。
public interface ListeningExecutorService extends ExecutorService { <T> ListenableFuture<T> submit(Callable<T> task); ListenableFuture<?> submit(Runnable task); <T> ListenableFuture<T> submit(Runnable task, T result); }
同樣的,newTaskFor
方法也被進行了重寫,返回了自定義的Future類:TrustedListenableFutureTask
@Override protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return TrustedListenableFutureTask.create(runnable, value); } @Override protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return TrustedListenableFutureTask.create(callable); }
任務調用會走 TrustedFutureInterruptibleTask
的run方法:
@Override public void run() { TrustedFutureInterruptibleTask localTask = task; if (localTask != null) { localTask.run(); } } @Override public final void run() { if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) { return; // someone else has run or is running. } try { // 抽象方法,子類進行重寫 runInterruptibly(); } finally { if (wasInterrupted()) { while (!doneInterrupting) { Thread.yield(); } } } }
最終還是調用到 TrustedFutureInterruptibleTask
的 runInterruptibly
方法,等待任務完成後調用 set
方法。
@Override void runInterruptibly() { if (!isDone()) { try { set(callable.call()); } catch (Throwable t) { setException(t); } } } protected boolean set(@Nullable V value) { Object valueToSet = value == null ? NULL : value; // CAS設置值 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { complete(this); return true; } return false; }
在 complete
方法的最後會獲取到Listener進行回調。
上面提到的 SettableFuture
和 ListenableFuture
的 cancel
方法效果不同,原因在於一個重寫了 afterDone
方法而一個沒有。
下麵是 ListenableFuture
的 afterDone
方法:
@Override protected void afterDone() { super.afterDone(); if (wasInterrupted()) { TrustedFutureInterruptibleTask localTask = task; if (localTask != null) { localTask.interruptTask(); } } this.task = null; }
wasInterrupted
用來判斷是否調用了 cancel
(cancel方法會設置一個取消對象Cancellation到value中)
protected final boolean wasInterrupted() { final Object localValue = value; return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; }
interruptTask
方法通過線程的 interrupt
方法真正取消線程任務的執行:
final void interruptTask() { Thread currentRunner = runner; if (currentRunner != null) { currentRunner.interrupt(); } doneInterrupting = true; }
由 Callback Hell 引出 Promise 模式
如果你對 ES6 有所接觸,就不會對 Promise 這個模式感到陌生,如果你對前端不熟悉,也不要緊,我們先來看看回調地獄(Callback Hell)是個什麼概念。
回調是一種我們推崇的非同步調用方式,但也會遇到問題,也就是回調的嵌套。當需要多個非同步回調一起書寫時,就會出現下麵的代碼(以 js 為例):
asyncFunc1(opt, (...args1) => { asyncFunc2(opt, (...args2) => { asyncFunc3(opt, (...args3) => { asyncFunc4(opt, (...args4) => { // some operation }); }); }); });
雖然在 JAVA 業務代碼中很少出現回調的多層嵌套,但總歸是個問題,這樣的代碼不易讀,嵌套太深修改也麻煩。於是 ES6 提出了 Promise 模式來解決回調地獄的問題。可能就會有人想問:java 中存在 Promise 模式嗎?答案是肯定的。
前面提到了 Netty 和 Guava 的擴展都提供了 addListener 這樣的介面,用於處理 Callback 調用,但其實 jdk1.8 已經提供了一種更為高級的回調方式:CompletableFuture。首先嘗試用 CompletableFuture 來重寫上面回調的問題。
public class CompletableFutureTest { public static void main(String[] args) throws InterruptedException { System.out.println("開始:" + DateUtils.getNow()); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("開始耗時計算:" + DateUtils.getNow()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("結束耗時計算:" + DateUtils.getNow()); return 100; }); completableFuture.whenComplete((result, e) -> { System.out.println("回調結果:" + result); }); System.out.println("結束:" + DateUtils.getNow()); new CountDownLatch(1).await(); } }
使用CompletableFuture耗時操作沒有占用主線程的時間片,達到了非同步調用的效果。我們也不需要引入任何第三方的依賴,這都是依賴於 java.util.concurrent.CompletableFuture 的出現。CompletableFuture 提供了近 50 多個方法,大大便捷了 java 多線程操作,和非同步調用的寫法。
使用 CompletableFuture 解決回調地獄問題:
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException { long l = System.currentTimeMillis(); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("在回調中執行耗時操作..."); Thread.sleep(10000); return 100; }); completableFuture = completableFuture.thenCompose(i -> { return CompletableFuture.supplyAsync(() -> { System.out.println("在回調的回調中執行耗時操作..."); Thread.sleep(10000); return i + 100; }); }); completableFuture.whenComplete((result, e) -> { System.out.println("計算結果:" + result); }); System.out.println("主線程運算耗時:" + (System.currentTimeMillis() - l) + " ms"); new CountDownLatch(1).await(); } }
輸出:
在回調中執行耗時操作...主線程運算耗時:58 ms在回調的回調中執行耗時操作...計算結果:200
使用 thenCompose 或者 thenComposeAsync 等方法可以實現回調的回調,且寫出來的方法易於維護。
總的看來,為Future模式增加回調功能就不需要阻塞等待結果的返回並且不需要消耗無謂的CPU資源去輪詢處理狀態,JDK8之前使用Netty或者Guava提供的工具類,JDK8之後則可以使用自帶的 CompletableFuture
類。Future 有兩種模式:將來式和回調式。而回調式會出現回調地獄的問題,由此衍生出了 Promise 模式來解決這個問題。這才是 Future 模式和 Promise 模式的相關性。