背景 1. 硬體的極速發展,多核心CPU司空見慣;分散式的軟體架構司空見慣; 2. 功能API大多採用混聚的方式把基礎服務的內容鏈接在一起,方便用戶生活。 拋出了兩個問題: 1. 如何發揮多核能力; 2. 切分大型任務,讓每個子任務並行運行; 併發和並行的區別 |項目|區別1|實現技術| | | | ...
背景
- 硬體的極速發展,多核心CPU司空見慣;分散式的軟體架構司空見慣;
- 功能API大多採用混聚的方式把基礎服務的內容鏈接在一起,方便用戶生活。
拋出了兩個問題:
- 如何發揮多核能力;
- 切分大型任務,讓每個子任務並行運行;
併發和並行的區別
項目 | 區別1 | 實現技術 |
---|---|---|
並行 | 每個任務跑在單獨的cpu核心上 | 分支合併框架,並行流 |
併發 | 不同任務共用cpu核心,基於時間片調度 | CompletableFuture |
Future介面
java5開始引入。將來某個時刻發生的事情進行建模。
進行一個非同步計算,返回一個執行運算的結果引用,當運算結束後,這個引用可以返回給調用方。
可以使用Future把哪些潛在耗時的任務放到非同步線程中,讓主線程繼續執行其他有價值的工作,不在白白等待。
下麵是一個例子:使用Future,可以讓兩個任務併發的運行,然後匯聚結果;
package com.test.completable;
import com.google.common.base.Stopwatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 說明:Future應用實例
* @author carter
* 創建時間: 2019年11月18日 10:53
**/
public class FutureTest {
static final ExecutorService pool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
Stopwatch stopwatch = Stopwatch.createStarted();
Future<Long> longFuture = pool.submit(() -> doSomethingLongTime());
doSomething2();
try {
final Long longValue = longFuture.get(3, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " future return value :" + longValue + " : " + stopwatch.stop());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
pool.shutdown();
}
private static void doSomething2() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " doSomething2 :" + stopwatch.stop());
}
private static Long doSomethingLongTime() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " doSomethingLongTime : " + stopwatch.stop());
return 1000L;
}
}
沒法編寫簡介的併發代碼。描敘能力不夠;比如如下場景:
- 將兩個非同步計算的結果合併為一個,這兩個非同步計算之間互相獨立,但是第二個有依賴第一個結果。
- 等待Future中所有的任務都完成;
- 僅等待Future集合中最快結束的任務完成,並返回它的結果;
- 通過編程的方式完成一個Future任務的執行;
- 響應Future的完成事件。
基於這個缺陷,java8中引入了CompletableFuture 類;
實現非同步API
技能點:
- 提供非同步API;
- 修改同步的API為非同步的API,如何使用流水線把兩個任務合併為一個非同步計算操作;
- 響應式的方式處理非同步操作的完成事件;
類型 | 區別 | 是否堵塞 |
---|---|---|
同步API | 調用方在被調用運行的過程中等待,被調用方運行結束後返回,調用方取得返回值後繼續運行 | 堵塞 |
非同步API | 調用方和被調用方是非同步的,調用方不用等待被調用方返回結果 | 非堵塞 |
package com.test.completable;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 說明:非同步調用計算價格的方法
* @author carter
* 創建時間: 2019年11月18日 13:32
**/
public class Test {
public static void main(String[] args) {
Shop shop = new Shop("BestShop");
Stopwatch stopwatch = Stopwatch.createStarted();
Stopwatch stopwatch2 = Stopwatch.createStarted();
Future<Double> doubleFuture = shop.getPriceFuture("pizza");
System.out.println("getPriceFuture return after: " + stopwatch.stop());
doSomethingElse();
try{
final Double price = doubleFuture.get();
System.out.println("price is " + price + " return after: " + stopwatch2.stop());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private static void doSomethingElse() {
Stopwatch stopwatch = Stopwatch.createStarted();
DelayUtil.delay();
System.out.println("doSomethingElse " + stopwatch.stop());
}
}
錯誤處理
如果計算價格的方法產生了錯誤,提示錯誤的異常會被現在在試圖計算商品價格的當前線程的範圍內,最終計算的非同步線程會被殺死,這會導致get方法返回結果的客戶端永久的被等待。
如何避免異常被掩蓋, completeExceptionally會把CompletableFuture內發生的問題拋出去。
private static void test2() {
Shop shop = new Shop("BestShop");
Stopwatch stopwatch = Stopwatch.createStarted();
Stopwatch stopwatch2 = Stopwatch.createStarted();
Future<Double> doubleFuture = shop.getPriceFutureException("pizza");
System.out.println("getPriceFuture return after: " + stopwatch.stop());
doSomethingElse();
try{
final Double price = doubleFuture.get();
System.out.println("price is " + price + " return after: " + stopwatch2.stop());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
方法改造:
//非同步方式查詢產品價格,異常拋出去
public Future<Double> getPriceFutureException(String product){
final CompletableFuture<Double> doubleCompletableFuture = new CompletableFuture<>();
new Thread(()->{try {
doubleCompletableFuture.complete(alculatePriceException(product));
}catch (Exception ex){
doubleCompletableFuture.completeExceptionally(ex);
}
}).start();
return doubleCompletableFuture;
}
無堵塞
即讓多個線程去非同步並行或者併發的執行任務,計算完之後匯聚結果;
private static void test3(String productName) {
Stopwatch stopwatch = Stopwatch.createStarted();
final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
.map(item -> String.format("商店:%s的商品:%s 售價是:%s", item.getName(), productName, item.getPrice(productName)))
.collect(Collectors.toList());
System.out.println(stringList);
System.out.println("test3 done in " + stopwatch.stop());
}
private static void test3_parallelStream(String productName) {
Stopwatch stopwatch = Stopwatch.createStarted();
final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
.parallel()
.map(item -> String.format("商店:%s的商品:%s 售價是:%s", item.getName(), productName, item.getPrice(productName)))
.collect(Collectors.toList());
System.out.println(stringList);
System.out.println("test3_parallelStream done in " + stopwatch.stop());
}
private static void test3_completableFuture(String productName) {
Stopwatch stopwatch = Stopwatch.createStarted();
final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
.map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售價是:%s", item.getName(), productName, item.getPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(stringList);
System.out.println("test3_completableFuture done in " + stopwatch.stop());
}
private static void test3_completableFuture_pool(String productName) {
Stopwatch stopwatch = Stopwatch.createStarted();
final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
.map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售價是:%s", item.getName(), productName, item.getPrice(productName)),pool))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(stringList);
System.out.println("test3_completableFuture done in " + stopwatch.stop());
}
代碼中有一個簡單的計算場景,我想查詢4家商店的iphone11售價;
華強北,益田蘋果店,香港九龍城,京東商城;
每一家的查詢大概耗時1s;
任務處理方式 | 耗時 | 優缺點說明 |
---|---|---|
順序執行 | 4秒多 | 簡單,好理解 |
並行流 | 1秒多 | 無法定製流內置的線程池,使用簡單,改造簡單 |
CompletableFuture 預設線程池 | 2秒多 | 預設線程池 |
CompletableFuture 指定線程池 | 1秒多 | 指定了線程池,可定製性更好,相比於並行流 |
多個非同步任務的流水線操作
場景: 先計算價格,在拿到折扣,最後計算折扣價格;
private static void test4(String productName) {
Stopwatch stopwatch = Stopwatch.createStarted();
final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
.map(shop->shop.getPrice_discount(productName))
.map(Quote::parse)
.map(DisCount::applyDiscount)
.collect(Collectors.toList());
System.out.println(stringList);
System.out.println("test4 done in " + stopwatch.stop());
}
private static void test4_completableFuture(String productName) {
Stopwatch stopwatch = Stopwatch.createStarted();
final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
.map(shop->CompletableFuture.supplyAsync(()->shop.getPrice_discount(productName),pool))
.map(future->future.thenApply( Quote::parse))
.map(future->future.thenCompose(quote -> CompletableFuture.supplyAsync(()->DisCount.applyDiscount(quote),pool)))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(stringList);
System.out.println("test4_completableFuture done in " + stopwatch.stop());
}
以上是有依賴關係的兩個任務的聚合,即任務2,依賴任務1的結果。使用的是thenCompose方法;
接下來如果有兩個任務可以非同步執行,最後需要依賴著兩個任務的結果計算得到最終結果,採用的是thenCombine;
//兩個不同的任務,最後需要匯聚結果,採用combine
private static void test5(String productName) {
Stopwatch stopwatch = Stopwatch.createStarted();
Shop shop = new Shop("香港九龍");
Double pricefinal = CompletableFuture.supplyAsync(()->shop.getPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(shop::getRate),(price, rate)->price * rate).join();
System.out.println("test4 done in " + stopwatch.stop());
}
completion事件
讓任務儘快結束,無需等待;
有多個服務來源,你請求多個,誰先返回,就先響應;
結果依次返回:
//等待所有的任務執行完畢; CompletableFuture.allOf()
public void findPriceStream(String productName){
List<Shop> shops = Arrays.asList(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"));
final CompletableFuture[] completableFutureArray = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice_discount(productName), pool))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> DisCount.applyDiscount(quote), pool)))
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(completableFutureArray).join();
}
多個來源獲取最快的結果:
//有兩個獲取天氣的途徑,哪個快最後結果就取哪一個
public static void getWeather(){
final Object join = CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> a_weather()), CompletableFuture.supplyAsync(() -> b_weather())).join();
System.out.println(join);
}
private static String b_weather() {
DelayUtil.delay(3);
return "bWeather";
}
private static String a_weather() {
DelayUtil.delay(5);
return "aWeather";
}
源碼分析
可完備化的將來;CompletableFuture ;
先看簽名:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
實現了Futrue,CompletionStage介面;
這兩個介面簡單說明一下:
介面 | 關鍵特性 |
---|---|
Future | 直接翻譯為未來,標識把一個任務非同步執行,需要的的時候,通過get方法獲取,也可以取消cancel,此外還提供了狀態查詢方法,isDone, isCancled,實現類是FutureTask |
CompletionStage | 直接翻譯是完成的階段,提供了函數式編程方法 |
可以分為如下幾類方法
方法 | 說明 |
---|---|
thenApply(Function f) | 當前階段正常完成之後,返回一個新的階段,新的階段把當前階段的結果作為參數輸入; |
thenConsume(Consumer c), | 當前階段完成之後,結果作為參數輸入,直接消費掉,得到不返回結果的完成階段; |
thenRun(Runnable action), | 不接受參數,只是繼續執行任務,得到一個新的完成階段; |
thenCombine(otherCompletionStage,BiFunction), | 當兩個完成階段都完成的時候,執行BIFunction,返回一個新的階段; |
thenAcceptBoth(OtherCompletionStage, BiConsumer) | 兩個完成階段都完成之後,對兩個結果進行消費; |
runAfterBoth(OtherCompletionStage,Runable) | 兩個完成階段都完成之後,執行一個動作; |
applyToEither(OtherCompletionStage,Function) | 兩個完成階段的任何一個執行結束,進入函數操作,並返回一個新的階段 |
acceptEither(OtherCompletionStage,Consumer) | 兩個完成階段的任何一個執行結束,消費掉,返回一個空返回值的完成階段 |
runAfterEither(OtherCompletionStage,Runable) | 兩個完成階段的任何一個結束,執行一個動作,返回一個空返回值的完成階段 |
thenCompose(Function) | 當前階段完成,返回值作為參數,進行函數運算,然後結果作為一個新的完成階段 |
exceptionally(Function) | 無論當前階段是否正常完成,消費掉異常,然後返回值作為一個新的完成階段 |
whenComplete | |
handle | 無論當前完成階段是否正常結束,都執行一個BIFunction的函數,並返回一個新結果作為一個新的完成階段 |
toCompletableFuture | 轉換為ComplatableFuture |
裡面的實現細節後面單獨成文章再講。
小結
- 執行一些比較耗時的操作,尤其是依賴一個或者多個遠程服務的操作,可以使用非同步任務改善程式的性能,加快程式的響應速度;
- 使用CompletableFuture你可以輕鬆的實現非同步API;
- CompletableFuture提供了異常管理機制,讓主線程有機會接管子任務拋出的異常;
- 把同步API封裝到CompletableFuture中,可以非同步得到它的結果;
- 如果非同步任務之間互相獨立,而他們之間的某一些結果是另外一些的輸入,可以把這些任務進行compose;
- 可以為CompletableFuture中的任務註冊一個回調函數,當任務執行完畢之後再進行一些其它操作;
- 你可以決定什麼時候結束程式的運行,是所有的CompletableFuture任務所有對象執行完畢,或者只要其中任何一個完成即可。
原創不易,轉載請註明出處。