CompletableFuture對象是JDK1.8版本新引入的類,這個類實現了兩個介面,一個是Future介面,一個是CompletionStage介面。 ...
1、CompletableFuture介紹
CompletableFuture對象是JDK1.8版本新引入的類,這個類實現了兩個介面,一個是Future介面,一個是CompletionStage介面。
CompletionStage介面是JDK1.8版本提供的介面,用於非同步執行中的階段處理,CompletionStage定義了一組介面用於在一個階段執行結束之後,要麼繼續執行下一個階段,要麼對結果進行轉換產生新的結果等,一般來說要執行下一個階段都需要上一個階段正常完成,這個類也提供了對異常結果的處理介面
2、CompletableFuture的API
2.1 提交任務
在CompletableFuture中提交任務有以下幾種方式:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
這四個方法都是用來提交任務的,不同的是supplyAsync提交的任務有返回值,runAsync提交的任務沒有返回值。兩個介面都有一個重載的方法,第二個入參為指定的線程池,如果不指定,則預設使用ForkJoinPool.commonPool()線程池。在使用的過程中儘量根據不同的業務來指定不同的線程池,方便對不同線程池進行監控,同時避免業務共用線程池相互影響。
2.2 結果轉換
2.2.1 thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
thenApply這一組函數入參是Function,意思是將上一個CompletableFuture執行結果作為入參,再次進行轉換或者計算,重新返回一個新的值。
2.2.2 handle
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
handle這一組函數入參是BiFunction,該函數式介面有兩個入參一個返回值,意思是處理上一個CompletableFuture的處理結果,同時如果有異常,需要手動處理異常。
2.2.3 thenRun
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
thenRun這一組函數入參是Runnable函數式介面,該介面無需入參和出參,這一組函數是在上一個CompletableFuture任務執行完成後,在執行另外一個介面,不需要上一個任務的結果,也不需要返回值,只需要在上一個任務執行完成後執行即可。
2.2.4 thenAccept
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
thenAccept這一組函數的入參是Consumer,該函數式介面有一個入參,沒有返回值,所以這一組介面的意思是處理上一個CompletableFuture的處理結果,但是不返回結果。
2.2.5 thenAcceptBoth
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
thenAcceptBoth這一組函數入參包括CompletionStage以及BiConsumer,CompletionStage是JDK1.8新增的介面,在JDK中只有一個實現類:CompletableFuture,所以第一個入參就是CompletableFuture,這一組函數是用來接受兩個CompletableFuture的返回值,並將其組合到一起。BiConsumer這個函數式介面有兩個入參,並且沒有返回值,BiConsumer的第一個入參就是調用方CompletableFuture的執行結果,第二個入參就是thenAcceptBoth介面入參的CompletableFuture的執行結果。所以這一組函數意思是將兩個CompletableFuture執行結果合併到一起。
2.2.6 thenCombine
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
thenCombine這一組函數和thenAcceptBoth類似,入參都包含一個CompletionStage,也就是CompletableFuture對象,意思也是組合兩個CompletableFuture的執行結果,不同的是thenCombine的第二個入參為BiFunction,該函數式介面有兩個入參,同時有一個返回值。所以與thenAcceptBoth不同的是,thenCombine將兩個任務結果合併後會返回一個全新的值作為出參。
2.2.7 thenCompose
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
thenCompose這一組函數意思是將調用方的執行結果作為Function函數的入參,同時返回一個新的CompletableFuture對象。
2.3 回調方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
whenComplete方法意思是當上一個CompletableFuture對象任務執行完成後執行該方法。BiConsumer函數式介面有兩個入參沒有返回值,這兩個入參第一個是CompletableFuture任務的執行結果,第二個是異常信息。表示處理上一個任務的結果,如果有異常,則需要手動處理異常,與handle方法的區別在於,handle方法的BiFunction是有返回值的,而BiConsumer是沒有返回值的。
以上方法都有一個帶有Async的方法,帶有Async的方法表示是非同步執行的,會將該任務放到線程池中執行,同時該方法會有一個重載的方法,最後一個參數為Executor,表示非同步執行可以指定線程池執行。為了方便進行控制,最好在使用CompletableFuture時手動指定我們的線程池。
2.4 異常處理
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
exceptionally是用來處理異常的,當任務拋出異常後,可以通過exceptionally來進行處理,也可以選擇使用handle來進行處理,不過兩者有些不同,hand是用來處理上一個任務的結果,如果有異常情況,就處理異常。而exceptionally可以放在CompletableFuture處理的最後,作為兜底邏輯來處理未知異常。
2.5 獲取結果
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf是需要入參中所有的CompletableFuture任務執行完成,才會進行下一步;
anyOf是入參中任何一個CompletableFuture任務執行完成都可以執行下一步。
public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent)
public T join()
get方法一個是不帶超時時間的,一個是帶有超時時間的。
getNow方法則是立即返回結果,如果還沒有結果,則返回預設值,也就是該方法的入參。
join方法是不帶超時時間的等待任務完成。
3、CompletableFuture原理
join方法同樣表示獲取結果,但是join與get方法有什麼區別呢。
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
}
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
以上是CompletableFuture類中兩個方法的代碼,可以看到兩個方法幾乎一樣。區別在於reportJoin/reportGet,waitingGet方法是一致的,只不過參數不一樣,我們在看下reportGet與reportJoin方法。
private static <T> T reportGet(Object r)
throws InterruptedException, ExecutionException {
if (r == null) // by convention below, null means interrupted
throw new InterruptedException();
if (r instanceof AltResult) {
Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null)
x = cause;
throw new ExecutionException(x);
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
private static <T> T reportJoin(Object r) {
if (r instanceof AltResult) {
Throwable x;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if (x instanceof CompletionException)
throw (CompletionException)x;
throw new CompletionException(x);
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
可以看到這兩個方法很相近,reportGet方法判斷了r對象是否為空,並拋出了中斷異常,而reportJoin方法沒有判斷,同時reportJoin拋出的都是運行時異常,所以join方法也是無需手動捕獲異常的。
我們在看下waitingGet方法
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = SPINS;
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
該waitingGet方法是通過while的方式迴圈判斷是否任務已經完成並產生結果,如果結果為空,則會一直在這裡迴圈,這裡需要註意的是在這裡初始化了一下spins=-1,當第一次進入while迴圈的時候,spins是-1,這時會將spins賦值為一個常量,該常量為SPINS。
private static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
1 << 8 : 0);
這裡判斷可用CPU數是否大於1,如果大於1,則該常量為 1<< 8,也就是256,否則該常量為0。
第二次進入while迴圈的時候,spins是256大於0,這裡做了減一的操作,下次進入while迴圈,如果還沒有結果,依然是大於0繼續做減一的操作,此處用來做短時間的自旋等待結果,只有當spins等於0,後續會進入正常流程判斷。
我們在看下timedGet方法的源碼
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally don't spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null) {
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L) {
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;
}
timedGet方法依然是通過while迴圈的方式來判斷是否已經完成,timedGet方法入參為一個納秒值,並通過該值計算出一個deadline截止時間,當while迴圈還未獲取到任務結果且已經達到截止時間,則拋出一個TimeoutException異常。
4、CompletableFuture實現多線程任務
這裡我們通過CompletableFuture來實現一個多線程處理非同步任務的例子。
這裡我們創建10個任務提交到我們指定的線程池中執行,並等待這10個任務全部執行完畢。
每個任務的執行流程為第一次先執行加法,第二次執行乘法,如果發生異常則返回預設值,當10個任務執行完成後依次列印每個任務的結果。
public void demo() throws InterruptedException, ExecutionException, TimeoutException {
// 1、自定義線程池
ExecutorService executorService = new ThreadPoolExecutor(5, 10,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
// 2、集合保存future對象
List<CompletableFuture<Integer>> futures = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
CompletableFuture<Integer> future = CompletableFuture
// 提交任務到指定線程池
.supplyAsync(() -> this.addValue(finalI), executorService)
// 第一個任務執行結果在此處進行處理
.thenApplyAsync(k -> this.plusValue(finalI, k), executorService)
// 任務執行異常時處理異常並返回預設值
.exceptionally(e -> this.defaultValue(finalI, e));
// future對象添加到集合中
futures.add(future);
}
// 3、等待所有任務執行完成,此處最好加超時時間
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
for (CompletableFuture<Integer> future : futures) {
Integer num = future.get();
System.out.println("任務執行結果為:" + num);
}
System.out.println("任務全部執行完成!");
}
private Integer addValue(Integer index) {
System.out.println("第" + index + "個任務第一次執行");
if (index == 3) {
int value = index / 0;
}
return index + 3;
}
private Integer plusValue(Integer index, Integer num) {
System.out.println("第" + index + "個任務第二次執行,上次執行結果:" + num);
return num * 10;
}
private Integer defaultValue(Integer index, Throwable e) {
System.out.println("第" + index + "個任務執行異常!" + e.getMessage());
e.printStackTrace();
return 10;
}
作者:京東物流 丁冬
來源:京東雲開發者社區 自猿其說Tech