在上篇文章中,我們介紹了Future相關的用法,使用它可以獲取非同步任務執行的返回值。我們再次回顧一下Future相關的用法。 ...
一、摘要
在上篇文章中,我們介紹了Future
相關的用法,使用它可以獲取非同步任務執行的返回值。
我們再次回顧一下Future
相關的用法。
public class FutureTest {
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
// 創建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(1);
// 提交任務並獲得Future的實例
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// 執行下載某文件任務,並返迴文件名稱
System.out.println("thread name:" + Thread.currentThread().getName() + " 開始執行下載任務");
Thread.sleep(200);
return "xxx.png";
}
});
//模擬主線程其它操作耗時
Thread.sleep(300);
// 通過阻塞方式,從Future中獲取非同步執行返回的結果
String result = future.get();
System.out.println("任務執行結果:" + result);
System.out.println("總共用時:" + (System.currentTimeMillis() - startTime) + "ms");
// 任務執行完畢之後,關閉線程池
executor.shutdown();
}
}
運行結果如下:
thread name:pool-1-thread-1 開始執行下載任務
任務執行結果:xxx.png
總共用時:308ms
如果不採用線程執行,那麼總共用時應該會是 200 + 300 = 500 ms,而採用線程來非同步執行,總共用時是 308 ms。不難發現,通過Future
和線程池的搭配使用,可以有效的提升程式的執行效率。
但是Future
對非同步執行結果的獲取並不是很友好,要麼調用阻塞方法get()
獲取結果,要麼輪訓調用isDone()
方法是否等於true
來判斷任務是否執行完畢來獲取結果,這兩種方法都不算很好,因為主線程會被迫等待。
因此,從 Java 8 開始引入了CompletableFuture
,它針對Future
做了很多的改進,在實現Future
介面相關功能之外,還支持傳入回調對象,當非同步任務完成或者發生異常時,自動調用回調對象方法。
下麵我們一起來看看CompletableFuture
相關的用法!
二、CompletableFuture 用法介紹
我們還是以上面的例子為例,改用CompletableFuture
來實現,內容如下:
public class FutureTest2 {
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf = CompletableFuture.supplyAsync(FutureTest2::download);
// 如果執行成功,回調此方法
cf.thenAccept((result) -> {
System.out.println("任務執行成功,返回結果值:" + result);
});
// 如果執行異常,回調此方法
cf.exceptionally((e) -> {
System.out.println("任務執行失敗,原因:" + e.getMessage());
return null;
});
//模擬主線程其它操作耗時
Thread.sleep(300);
}
/**
* 下載某個任務
* @return
*/
private static String download(){
// 執行下載某文件任務,並返迴文件名稱
System.out.println("thread name:" + Thread.currentThread().getName() + " 開始執行下載任務");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "xxx.png";
}
}
運行結果如下:
thread name:ForkJoinPool.commonPool-worker-1 開始執行下載任務
任務執行成功,返回結果值:xxx.png
可以發現,採用CompletableFuture
類的supplyAsync()
方法進行非同步編程,代碼上簡潔了很多,不需要單獨創建線程池。
實際上,CompletableFuture
也使用了線程池來執行任務,部分核心源碼如下:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
// 判斷當前機器 cpu 可用邏輯核心數是否大於1
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
// 預設採用的線程池
// 如果useCommonPool = true,採用 ForkJoinPool.commonPool 線程池
// 如果useCommonPool = false,採用 ThreadPerTaskExecutor 執行器
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
// ThreadPerTaskExecutor執行器類
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
// 非同步執行任務的方法
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
// 非同步執行任務的方法,支持傳入自定義線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
}
從源碼上可以分析出如下幾點:
- 當前機器 cpu 可用邏輯核心數大於 1,預設會採用
ForkJoinPool.commonPool()
線程池來執行任務 - 當前機器 cpu 可用邏輯核心數等於 1,預設會採用
ThreadPerTaskExecutor
類來執行任務,它是個一對一執行器,每提交一個任務會創建一個新的線程來執行 - 同時也支持用戶傳入自定義線程池來非同步執行任務
其中ForkJoinPool
線程池是從 JDK 1.7 版本引入的,它是一個全新的線程池,後面在介紹Fork/Join
框架文章中對其進行介紹。
除此之外,CompletableFuture
為開發者還提供了幾十種方法,以便滿足更多的非同步任務執行的場景。這些方法包括創建非同步任務、任務非同步回調、多個任務組合處理等內容,下麵我們就一起來學習一下相關的使用方式。
2.1、創建非同步任務
CompletableFuture
創建非同步任務,常用的方法有兩個。
runAsync()
:執行非同步任務時,沒有返回值supplyAsync()
:執行非同步任務時,可以帶返回值
runAsync()
和supplyAsync()
方法相關的源碼如下:
// 使用預設內置線程池執行任務,根據runnable構建執行任務,無返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 使用自定義線程池執行任務,根據runnable構建執行任務,無返回值
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 使用預設內置線程池執行任務,根據supplyAsync構建執行任務,可以帶返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 使用自定義線程池執行任務,根據supplyAsync構建執行任務,可以帶返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
兩者都支持使用自定義的線程池來執行任務,稍有不同的是supplyAsync()
方法的入參使用的是Supplier
介面,它表示結果的提供者,該結果返回一個對象且不接受任何參數,支持通過 lambda 語法簡寫。
下麵我們一起來看看相關的使用示例!
2.1.1、runAsync 使用示例
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<Void> cf = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("runAsync,執行完畢");
}
});
System.out.println("runAsync,任務執行結果:" + cf.get());
}
輸出結果:
runAsync,執行完畢
runAsync,任務執行結果:null
2.1.2、supplyAsync 使用示例
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,執行完畢");
return "hello world";
});
System.out.println("supplyAsync,任務執行結果:" + cf.get());
}
輸出結果:
supplyAsync,執行完畢
supplyAsync,任務執行結果:hello world
2.2、任務非同步回調
當創建的非同步任務執行完畢之後,我們希望拿著上一個任務的執行結果,繼續執行後續的任務,此時就可以採用回調方法來處理。
CompletableFuture
針對任務非同步回調做了很多的支持,常用的方法如下:
thenRun()/thenRunAsync()
:它表示上一個任務執行成功後的回調方法,無入參,無返回值thenAccept()/thenAcceptAsync()
:它表示上一個任務執行成功後的回調方法,有入參,無返回值thenApply()/thenApplyAsync()
:它表示上一個任務執行成功後的回調方法,有入參,有返回值whenComplete()/whenCompleteAsync()
:它表示任務執行完成後的回調方法,有入參,無返回值handle()/handleAsync()
:它表示任務執行完成後的回調方法,有入參,有返回值exceptionally()
:它表示任務執行異常後的回調方法
下麵我們一起來看看相關的使用示例!
2.2.1、thenRun/thenRunAsync
thenRun()/thenRunAsync()
方法,都表示上一個任務執行成功後的回調處理,無入參,無返回值。稍有不同的是,thenRunAsync()
方法會採用獨立的線程池來執行任務。
相關的源碼方法如下:
// 預設線程池
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
// 採用與上一個任務的線程池來執行任務
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
// 採用預設線程池來執行任務
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
從源碼上可以清晰的看到,thenRun()/thenRunAsync()
方法都調用了uniRunStage()
方法,不同的是thenRunAsync()
使用了asyncPool
參數,也就是預設的線程池;而thenRun()
方法使用的是null
,底層採用上一個任務的線程池來執行,總結下來就是:
- 當調用
thenRun()
方法執行任務時,當前任務和上一個任務都共用同一個線程池 - 當調用
thenRunAsync()
方法執行任務時,上一個任務採用自己的線程池來執行;而當前任務會採用預設線程池來執行,比如ForkJoinPool
。
thenAccept()/thenAcceptAsync()
、thenApply()/thenApplyAsync()
、whenComplete()/whenCompleteAsync()
、handle()/handleAsync()
方法之間的區別也類似,下文不再重覆講解。
下麵我們一起來看看thenRun()
方法的使用示例。
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,執行完畢");
return "hello world";
});
// 當上一個任務執行成功,會繼續回調當前方法
CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
System.out.println("thenRun1,執行完畢");
});
CompletableFuture<Void> cf3 = cf2.thenRun(() -> {
System.out.println("thenRun2,執行完畢");
});
System.out.println("任務執行結果:" + cf3.get());
}
輸出結果:
supplyAsync,執行完畢
thenRun1,執行完畢
thenRun2,執行完畢
任務執行結果:null
如果上一個任務執行異常,是不會回調thenRun()
方法的,示例如下:
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,執行完畢");
if(1 == 1){
throw new RuntimeException("執行異常");
}
return "hello world";
});
// 當上一個任務執行成功,會繼續回調當前方法
CompletableFuture<Void> cf1 = cf.thenRun(() -> {
System.out.println("thenRun1,執行完畢");
});
// 監聽執行時異常的回調方法
CompletableFuture<Void> cf2 = cf1.exceptionally((e) -> {
System.out.println("發生異常,錯誤信息:" + e.getMessage());
return null;
});
System.out.println("任務執行結果:" + cf2.get());
}
輸出結果:
supplyAsync,執行完畢
發生異常,錯誤信息:java.lang.RuntimeException: 執行異常
任務執行結果:null
可以清晰的看到,thenRun()
方法沒有回調。
thenAccept()
、thenAcceptAsync()
、thenApply()
、thenApplyAsync()
方法也類似,當上一個任務執行異常,不會回調這些方法。
2.2.2、thenAccept/thenAcceptAsync
thenAccept()/thenAcceptAsync()
方法,表示上一個任務執行成功後的回調方法,有入參,無返回值。
相關的示例如下。
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,執行完畢");
return "hello world";
});
// 當上一個任務執行成功,會繼續回調當前方法
CompletableFuture<Void> cf2 = cf1.thenAccept((r) -> {
System.out.println("thenAccept,執行完畢,上一個任務執行結果值:" + r);
});
System.out.println("任務執行結果:" + cf2.get());
}
輸出結果:
supplyAsync,執行完畢
thenAccept,執行完畢,上一個任務執行結果值:hello world
任務執行結果:null
2.2.3、thenApply/thenApplyAsync
thenApply()/thenApplyAsync()
方法,表示上一個任務執行成功後的回調方法,有入參,有返回值。
相關的示例如下。
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,執行完畢");
return "hello world";
});
// 當上一個任務執行成功,會繼續回調當前方法
CompletableFuture<String> cf2 = cf1.thenApply((r) -> {
System.out.println("thenApply,執行完畢,上一個任務執行結果值:" + r);
return "gogogo";
});
System.out.println("任務執行結果:" + cf2.get());
}
輸出結果:
supplyAsync,執行完畢
thenApply,執行完畢,上一個任務執行結果值:hello world
任務執行結果:gogogo
2.2.4、whenComplete/whenCompleteAsync
whenComplete()/whenCompleteAsync()
方法,表示任務執行完成後的回調方法,有入參,無返回值。
稍有不同的是:無論任務執行成功還是失敗,它都會回調。
相關的示例如下。
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,執行完畢");
if(1 == 1){
throw new RuntimeException("執行異常");
}
return "hello world";
});
// 當任務執行完成,會繼續回調當前方法
CompletableFuture<String> cf2 = cf1.whenComplete((r, e) -> {
System.out.println("whenComplete,執行完畢,上一個任務執行結果值:" + r + ",異常信息:" + e.getMessage());
});
// 監聽執行時異常的回調方法
CompletableFuture<String> cf3 = cf2.exceptionally((e) -> {
System.out.println("發生異常,錯誤信息:" + e.getMessage());
return e.getMessage();
});
System.out.println("任務執行結果:" + cf3.get());
}
輸出結果:
supplyAsync,執行完畢
whenComplete,執行完畢,上一個任務執行結果值:null,異常信息:java.lang.RuntimeException: 執行異常
發生異常,錯誤信息:java.lang.RuntimeException: 執行異常
任務執行結果:java.lang.RuntimeException: 執行異常
2.2.5、handle/handleAsync
handle()/handleAsync()
方法,表示任務執行完成後的回調方法,有入參,有返回值。
同樣的,無論任務執行成功還是失敗,它都會回調。
相關的示例如下。
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,執行完畢");
if(1 == 1){
throw new RuntimeException("執行異常");
}
return "hello world";
});
// 當任務執行完成,會繼續回調當前方法
CompletableFuture<String> cf2 = cf1.handle((r, e) -> {
System.out.println("handle,執行完畢,上一個任務執行結果值:" + r + ",異常信息:" + e.getMessage());
return "handle";
});
System.out.println("任務執行結果:" + cf2.get());
}
輸出結果:
supplyAsync,執行完畢
handle,執行完畢,上一個任務執行結果值:null,異常信息:java.lang.RuntimeException: 執行異常
任務執行結果:handle
2.2.6、exceptionally
exceptionally()
方法,表示任務執行異常後的回調方法。在上文的示例中有所介紹。
最後我們還是簡單的看下示例。
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,執行開始");
if(1 == 1){
throw new RuntimeException("執行異常");
}
return "hello world";
});
// 監聽執行時異常的回調方法
CompletableFuture<String> cf2 = cf1.exceptionally((e) -> {
System.out.println("發生異常,錯誤信息:" + e.getMessage());
return e.getMessage();
});
System.out.println("任務執行結果:" + cf2.get());
}
輸出結果:
supplyAsync,執行開始
發生異常,錯誤信息:java.lang.RuntimeException: 執行異常
任務執行結果:java.lang.RuntimeException: 執行異常
2.3、多個任務組合處理
某些場景下,如果希望獲取兩個不同的非同步執行結果進行組合處理,可以採用多個任務組合處理方式。
CompletableFuture
針對多個任務組合處理做了很多的支持,常用的組合方式有以下幾種。
AND組合
:表示將兩個CompletableFuture
任務組合起來,只有這兩個任務都正常執行完了,才會繼續執行回調任務,比如thenCombine()
方法OR組合
:表示將兩個CompletableFuture
任務組合起來,只要其中一個正常執行完了,就會繼續執行回調任務,比如applyToEither
方法AllOf組合
:可以將多個CompletableFuture
任務組合起來,只有所有的任務都正常執行完了,才會繼續執行回調任務,比如allOf()
方法AnyOf組合
:可以將多個CompletableFuture
任務組合起來,只要其中一個任務正常執行完了,就會繼續執行回調任務,比如anyOf()
方法
下麵我們一起來看看相關的使用示例!
2.3.1、AND組合
實現AND組合
的操作方法有很多,比如runAfterBoth()
、thenAcceptBoth()
、thenCombine()
等方法,它們之間的區別在於:是否帶有入參、是否帶有返回值。
其中thenCombine()
方法支持傳入參、帶返回值。
相關示例如下:
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync1,執行完畢");
return "supplyAsync1";
});
CompletableFuture<String> cf2 = CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync2,執行完畢");
return "supplyAsync2";
})
.thenCombine(cf1, (r1, r2) -> {
System.out.println("r1任務執行結果:" + r1);
System.out.println("r2任務執行結果:" + r2);
return r1 + "_" + r2;
});
System.out.println("任務執行結果:" + cf2.get());
}
輸出結果:
supplyAsync1,執行完畢
supplyAsync2,執行完畢
r1任務執行結果:supplyAsync2
r2任務執行結果:supplyAsync1
任務執行結果:supplyAsync2_supplyAsync1
2.3.2、OR組合
實現OR組合
的操作方法有很多,比如runAfterEither()
、acceptEither()
、applyToEither()
等方法,區別同上。
其中applyToEither()
方法支持傳入參、帶返回值。
相關示例如下:
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync1,執行完畢");
return "supplyAsync1";
});
CompletableFuture<String> cf2 = CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync2,執行完畢");
return "supplyAsync2";
})
.applyToEither(cf1, (r) -> {
System.out.println("第一個執行成功的任務結果:" + r);
return r + "_applyToEither";
});
System.out.println("任務執行結果:" + cf2.get());
}
輸出結果:
supplyAsync1,執行完畢
supplyAsync2,執行完畢
第一個執行成功的任務結果:supplyAsync2
任務執行結果:supplyAsync2_applyToEither
2.3.2、AllOf組合
實現AllOf組合
的操作就一個方法allOf()
,可以將多個任務進行組合,只有都執行成功才會回調,回調入參為空值。
相關示例如下:
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync1,執行完畢");
return "supplyAsync1";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync2,執行完畢");
return "supplyAsync2";
});
// 將多個任務,進行AND組合
CompletableFuture<String> cf3 = CompletableFuture
.allOf(cf1, cf2)
.handle((r, e) -> {
System.out.println("所有任務都執行成功,result:" + r);
return "over";
});
System.out.println(cf3.get());
}
輸出結果:
supplyAsync1,執行完畢
supplyAsync2,執行完畢
所有任務都執行成功,result:null
over
2.3.3、AnyOf組合
實現AnyOf組合
的操作,同樣就一個方法anyOf()
,可以將多個任務進行組合,只要一個執行成功就會回調,回調入參有值。
相關示例如下:
public static void main(String[] args) throws Exception {
// 創建非同步執行任務
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync1,執行完畢");
return "supplyAsync1";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync2,執行完畢");
return "supplyAsync2";
});
// 將多個任務,進行AND組合
CompletableFuture<String> cf3 = CompletableFuture
.anyOf(cf1, cf2)
.handle((r, e) -> {
System.out.println("某個任務執行成功,返回值:" + r);
return "over";
});
System.out.println(cf3.get());
}
輸出結果:
supplyAsync1,執行完畢
supplyAsync2,執行完畢
某個任務執行成功,返回值:supplyAsync1
over
三、小結
本文主要圍繞CompletableFuture
類相關用法進行了一次知識總結,通過CompletableFuture
類可以簡化非同步編程,同時支持多種非同步任務,按照條件組合處理,相比其它的併發工具類,操作更加強大、實用。
本篇內容比較多,如果有描述不對的地方,歡迎網友留言指出,希望本文知識總結能幫助到大家。
四、參考
1.https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
2.https://juejin.cn/post/6970558076642394142
作者:程式員志哥
出處:pzblog.cn
資源:微信搜【程式員志哥】關註我,回覆 【技術資料】有我準備的一線程式必備電腦書籍、大廠面試資料和免費電子書。 希望可以幫助大家提升技術和能力。