日常開發中,我們都會用到線程池,一般會用execute()和submit()方法提交任務。但是當你用過CompletableFuture之後,就會發現以前的線程池處理任務有多難用,功能有多簡陋,CompletableFuture又是多麼簡潔優雅。 要知道CompletableFuture已經隨著Ja ...
日常開發中,我們都會用到線程池,一般會用execute()和submit()方法提交任務。但是當你用過CompletableFuture之後,就會發現以前的線程池處理任務有多難用,功能有多簡陋,CompletableFuture又是多麼簡潔優雅。
要知道CompletableFuture已經隨著Java8發佈7年了,還沒有過它就有點說不過去了。
今天5分鐘帶你深入淺出CompletableFuture實用教程。
1. 使用線程池處理任務
/**
* @author yideng
* @apiNote 線程池使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
List<Future<String>> futures = new ArrayList<>();
for (Integer key : list) {
// 2. 提交任務
Future<String> future = executorService.submit(() -> {
// 睡眠一秒,模仿處理過程
Thread.sleep(1000L);
return "結果" + key;
});
futures.add(future);
}
// 3. 獲取結果
for (Future<String> future : futures) {
try {
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
}
輸出結果:
結果1
結果2
結果3
一般大家都會這樣使用線程池,但是有沒有思考過這樣使用有沒有什麼問題?
反正我發現兩個比較嚴重的問題:
- 獲取結果時,調用的future.get()方法,會阻塞當前線程,直到返回結果,大大降低性能
- 有一半的代碼在寫怎麼使用線程,其實我們不應該關心怎麼使用線程,更應該關註任務的處理
有沒有具體的優化方案呢?當然有了,請出來我們今天的主角CompletableFuture
2. 使用CompletableFuture重構任務處理
看一下使用CompletableFuture改造後代碼:
/**
* @author yideng
* @apiNote CompletableFuture使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
for (Integer key : list) {
// 2. 提交任務
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return "結果" + key;
}, executorService).whenCompleteAsync((result, exception) -> {
// 3. 獲取結果
System.out.println(result);
});;
}
executorService.shutdown();
// 由於whenCompleteAsync獲取結果的方法是非同步的,所以要阻塞當前線程才能輸出結果
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出結果:
結果1
結果2
結果3
代碼中使用了CompletableFuture的兩個方法,
supplyAsync()方法作用是提交非同步任務,有兩個傳參,任務和自定義線程池。
whenCompleteAsync()方法作用是非同步獲取結果,也有兩個傳參,結果和異常信息。
代碼經過CompletableFuture改造後,是多麼的簡潔優雅。
提交任務也不用再關心線程池是怎麼使用了,獲取結果也不用再阻塞當前線程了。
如果你比較倔強,還想同步獲取結果,可以使用whenComplete()方法,或者單獨調用join()方法。
join()方法配合Stream流是這樣用的:
/**
* @author yideng
* @apiNote CompletableFuture使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
// 2. 提交任務
List<String> results = list.stream().map(key ->
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return "結果" + key;
}, executorService))
.map(CompletableFuture::join).collect(Collectors.toList());
executorService.shutdown();
// 3. 獲取結果
System.out.println(results);
}
}
輸出結果:
[結果1,結果2,結果3]
多麼的簡潔優雅啊!原來executorService.submit()這種使用線程池的方式,可以徹底丟掉了。
3. CompletableFuture更多妙用
3.1 等待所有任務執行完成
如果讓你實現等待所有任務線程執行完成,再進行下一步操作,你會怎麼做?
我猜你一定會使用 線程池+CountDownLatch,像下麵這樣:
/**
* @author yideng
* @apiNote 線程池和CountDownLatch使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for (Integer key : list) {
// 2. 提交任務
executorService.execute(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
System.out.println("結果" + key);
countDownLatch.countDown();
});
}
executorService.shutdown();
// 3. 阻塞等待所有任務執行完成
try {
countDownLatch.await();
} catch (InterruptedException e) {
}
}
}
輸出結果:
結果2
結果3
結果1
Low不Low?十年前可以這樣寫,Java8都已經發佈7年了,你還不會用Java8的寫法?看一下使用CompletableFuture是怎麼重構的:
/**
* @author yideng
* @apiNote CompletableFuture.allOf()方法使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
// 2. 提交任務,並調用join()阻塞等待所有任務執行完成
CompletableFuture
.allOf(
list.stream().map(key ->
CompletableFuture.runAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
System.out.println("結果" + key);
}, executorService))
.toArray(CompletableFuture[]::new))
.join();
executorService.shutdown();
}
}
輸出結果:
結果3
結果1
結果2
代碼看著有點亂,其實邏輯很清晰。
- 遍歷list集合,提交CompletableFuture任務,把結果轉換成數組
- 再把數組放到CompletableFuture的allOf()方法裡面
- 最後調用join()方法阻塞等待所有任務執行完成
CompletableFuture的allOf()方法的作用就是,等待所有任務處理完成。
這樣寫是不是簡潔優雅了許多?
3.2 任何一個任務處理完成就返回
如果要實現這樣一個需求,往線程池提交一批任務,只要有其中一個任務處理完成就返回。
該怎麼做?如果你手動實現這個邏輯的話,代碼肯定複雜且低效,有了CompletableFuture就非常簡單了,只需調用anyOf()方法就行了。
/**
* @author yideng
* @apiNote CompletableFuture.anyOf()方法使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創建線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
long start = System.currentTimeMillis();
// 2. 提交任務
CompletableFuture<Object> completableFuture = CompletableFuture
.anyOf(
list.stream().map(key ->
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return "結果" + key;
}, executorService))
.toArray(CompletableFuture[]::new));
executorService.shutdown();
// 3. 獲取結果
System.out.println(completableFuture.join());
}
}
輸出結果:
結果3
一切都是那麼簡單優雅。
3.3 一個線程執行完成,交給另一個線程接著執行
有這麼一個需求:
一個線程處理完成,把處理的結果交給另一個線程繼續處理,怎麼實現?
你是不是想到了一堆工具,線程池、CountDownLatch、Semaphore、ReentrantLock、Synchronized,該怎麼進行組合使用呢?AB組合還是BC組合?
別瞎想了,你寫的肯定沒有CompletableFuture好用,看一下CompletableFuture是怎麼用的:
/**
* @author yideng
* @apiNote CompletableFuture線程接力處理示例
*/
public class ThreadDemo {
public static void main(String[] args) {
// 1. 創建線程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 2. 提交任務,並調用join()阻塞等待任務執行完成
String result2 = CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return "結果1";
}, executorService).thenApplyAsync(result1 -> {
// 睡眠一秒,模仿處理過程
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
return result1 + "結果2";
}, executorService).join();
executorService.shutdown();
// 3. 獲取結果
System.out.println(result2);
}
}
輸出結果:
結果1結果2
代碼主要用到了CompletableFuture的thenApplyAsync()方法,作用就是非同步處理上一個線程的結果。
是不是太方便了?
這麼好用的CompletableFuture還有沒有其他功能?當然有。
4. CompletableFuture常用API
4.1 CompletableFuture常用API說明
-
提交任務
supplyAsync
runAsync -
接力處理
thenRun thenRunAsync
thenAccept thenAcceptAsync
thenApply thenApplyAsync
handle handleAsync
applyToEither applyToEitherAsync
acceptEither acceptEitherAsync
runAfterEither runAfterEitherAsync
thenCombine thenCombineAsync
thenAcceptBoth thenAcceptBothAsync
API太多,有點眼花繚亂,很容易分類。
帶run的方法,無入參,無返回值。
帶accept的方法,有入參,無返回值。
帶supply的方法,無入參,有返回值。
帶apply的方法,有入參,有返回值。
帶handle的方法,有入參,有返回值,並且帶異常處理。
以Async結尾的方法,都是非同步的,否則是同步的。
以Either結尾的方法,只需完成任意一個。
以Both/Combine結尾的方法,必須所有都完成。
- 獲取結果
join 阻塞等待,不會拋異常
get 阻塞等待,會拋異常
complete(T value) 不阻塞,如果任務已完成,返回處理結果。如果沒完成,則返回傳參value。
completeExceptionally(Throwable ex) 不阻塞,如果任務已完成,返回處理結果。如果沒完成,拋異常。
4. CompletableFuture常用API使用示例
用最常見的煮飯來舉例:
4.1 then、handle方法使用示例
/**
* @author yideng
* @apiNote then、handle方法使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("1. 開始淘米");
return "2. 淘米完成";
}).thenApplyAsync(result -> {
System.out.println(result);
System.out.println("3. 開始煮飯");
// 生成一個1~10的隨機數
if (RandomUtils.nextInt(1, 10) > 5) {
throw new RuntimeException("4. 電飯煲壞了,煮不了");
}
return "4. 煮飯完成";
}).handleAsync((result, exception) -> {
if (exception != null) {
System.out.println(exception.getMessage());
return "5. 今天沒飯吃";
} else {
System.out.println(result);
return "5. 開始吃飯";
}
});
try {
String result = completableFuture.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
輸出結果可能是:
1. 開始淘米
2. 淘米完成
3. 開始煮飯
4. 煮飯完成
5. 開始吃飯
也可能是:
1. 開始淘米
2. 淘米完成
3. 開始煮飯
java.lang.RuntimeException: 4. 電飯煲壞了,煮不了
5. 今天沒飯吃
4.2 complete方法使用示例
/**
* @author yideng
* @apiNote complete使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "飯做好了";
});
//try {
// Thread.sleep(1L);
//} catch (InterruptedException e) {
//}
completableFuture.complete("飯還沒做好,我點外賣了");
System.out.println(completableFuture.join());
}
}
輸出結果:
飯還沒做好,我點外賣了
如果把註釋的sleep()方法放開,輸出結果就是:
飯做好了
4.3 either方法使用示例
/**
* @author yideng
* @apiNote either方法使用示例
*/
public class ThreadDemo {
public static void main(String[] args) {
CompletableFuture<String> meal = CompletableFuture.supplyAsync(() -> {
return "飯做好了";
});
CompletableFuture<String> outMeal = CompletableFuture.supplyAsync(() -> {
return "外賣到了";
});
// 飯先做好,就吃飯。外賣先到,就吃外賣。就是這麼任性。
CompletableFuture<String> completableFuture = meal.applyToEither(outMeal, myMeal -> {
return myMeal;
});
System.out.println(completableFuture.join());
}
}
輸出結果可能是:
飯做好了
也可能是:
外賣到了
學會了嗎?開發中趕快用起來吧!