java8-CompleableFuture的使用1

来源:https://www.cnblogs.com/snidget/archive/2019/11/19/11892097.html
-Advertisement-
Play Games

背景 1. 硬體的極速發展,多核心CPU司空見慣;分散式的軟體架構司空見慣; 2. 功能API大多採用混聚的方式把基礎服務的內容鏈接在一起,方便用戶生活。 拋出了兩個問題: 1. 如何發揮多核能力; 2. 切分大型任務,讓每個子任務並行運行; 併發和並行的區別 |項目|區別1|實現技術| | | | ...


背景

  1. 硬體的極速發展,多核心CPU司空見慣;分散式的軟體架構司空見慣;
  2. 功能API大多採用混聚的方式把基礎服務的內容鏈接在一起,方便用戶生活。

拋出了兩個問題:

  1. 如何發揮多核能力;
  2. 切分大型任務,讓每個子任務並行運行;

併發和並行的區別

項目 區別1 實現技術
並行 每個任務跑在單獨的cpu核心上 分支合併框架,並行流
併發 不同任務共用cpu核心,基於時間片調度 CompletableFuture

Future介面

java5開始引入。將來某個時刻發生的事情進行建模。
進行一個非同步計算,返回一個執行運算的結果引用,當運算結束後,這個引用可以返回給調用方。
可以使用Future把哪些潛在耗時的任務放到非同步線程中,讓主線程繼續執行其他有價值的工作,不在白白等待。

下麵是一個例子:使用Future,可以讓兩個任務併發的運行,然後匯聚結果;

package com.test.completable;

import com.google.common.base.Stopwatch;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 說明:Future應用實例
 * @author carter
 * 創建時間: 2019年11月18日 10:53
 **/

public class FutureTest {

    static final ExecutorService pool = Executors.newFixedThreadPool(2);


    public static void main(String[] args) {
        Stopwatch stopwatch = Stopwatch.createStarted();

        Future<Long> longFuture = pool.submit(() -> doSomethingLongTime());

        doSomething2();
        try {
            final Long longValue = longFuture.get(3, TimeUnit.SECONDS);
            System.out.println(Thread.currentThread().getName() + " future return value :" + longValue + " : " + stopwatch.stop());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }

    private static void doSomething2() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + " doSomething2 :" + stopwatch.stop());
    }

    private static Long doSomethingLongTime() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + " doSomethingLongTime : " + stopwatch.stop());
        return 1000L;
    }


}

沒法編寫簡介的併發代碼。描敘能力不夠;比如如下場景:

  1. 將兩個非同步計算的結果合併為一個,這兩個非同步計算之間互相獨立,但是第二個有依賴第一個結果。
  2. 等待Future中所有的任務都完成;
  3. 僅等待Future集合中最快結束的任務完成,並返回它的結果;
  4. 通過編程的方式完成一個Future任務的執行;
  5. 響應Future的完成事件。

基於這個缺陷,java8中引入了CompletableFuture 類;

實現非同步API

技能點:

  1. 提供非同步API;
  2. 修改同步的API為非同步的API,如何使用流水線把兩個任務合併為一個非同步計算操作;
  3. 響應式的方式處理非同步操作的完成事件;
類型 區別 是否堵塞
同步API 調用方在被調用運行的過程中等待,被調用方運行結束後返回,調用方取得返回值後繼續運行 堵塞
非同步API 調用方和被調用方是非同步的,調用方不用等待被調用方返回結果 非堵塞
package com.test.completable;

import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 說明:非同步調用計算價格的方法
 * @author carter
 * 創建時間: 2019年11月18日 13:32
 **/

public class Test {

    public static void main(String[] args) {
        Shop shop = new Shop("BestShop");

        Stopwatch stopwatch = Stopwatch.createStarted();
        Stopwatch stopwatch2 = Stopwatch.createStarted();

        Future<Double> doubleFuture = shop.getPriceFuture("pizza");

        System.out.println("getPriceFuture return after: " + stopwatch.stop());

        doSomethingElse();
        try{
            final Double price = doubleFuture.get();
            System.out.println("price is " + price + " return after: " + stopwatch2.stop());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    private static void doSomethingElse() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        DelayUtil.delay();
        System.out.println("doSomethingElse " + stopwatch.stop());

    }


}

錯誤處理

如果計算價格的方法產生了錯誤,提示錯誤的異常會被現在在試圖計算商品價格的當前線程的範圍內,最終計算的非同步線程會被殺死,這會導致get方法返回結果的客戶端永久的被等待。

如何避免異常被掩蓋, completeExceptionally會把CompletableFuture內發生的問題拋出去。


    private static void test2() {
        Shop shop = new Shop("BestShop");

        Stopwatch stopwatch = Stopwatch.createStarted();
        Stopwatch stopwatch2 = Stopwatch.createStarted();

        Future<Double> doubleFuture = shop.getPriceFutureException("pizza");

        System.out.println("getPriceFuture return after: " + stopwatch.stop());

        doSomethingElse();
        try{
            final Double price = doubleFuture.get();
            System.out.println("price is " + price + " return after: " + stopwatch2.stop());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

方法改造:

//非同步方式查詢產品價格,異常拋出去
    public Future<Double> getPriceFutureException(String product){


        final CompletableFuture<Double> doubleCompletableFuture = new CompletableFuture<>();

        new Thread(()->{try {
            doubleCompletableFuture.complete(alculatePriceException(product));
        }catch (Exception ex){
            doubleCompletableFuture.completeExceptionally(ex);
        }
        }).start();

        return doubleCompletableFuture;
    }

無堵塞

即讓多個線程去非同步並行或者併發的執行任務,計算完之後匯聚結果;


    private static void test3(String productName) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
                .map(item -> String.format("商店:%s的商品:%s 售價是:%s", item.getName(), productName, item.getPrice(productName)))
                .collect(Collectors.toList());

        System.out.println(stringList);
        System.out.println("test3 done in  " + stopwatch.stop());


    }

    private static void test3_parallelStream(String productName) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
                .parallel()
                .map(item -> String.format("商店:%s的商品:%s 售價是:%s", item.getName(), productName, item.getPrice(productName)))
                .collect(Collectors.toList());

        System.out.println(stringList);
        System.out.println("test3_parallelStream done in  " + stopwatch.stop());


    }


    private static void test3_completableFuture(String productName) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
                .map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售價是:%s", item.getName(), productName, item.getPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

        System.out.println(stringList);
        System.out.println("test3_completableFuture done in  " + stopwatch.stop());


    }



    private static void test3_completableFuture_pool(String productName) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
                .map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售價是:%s", item.getName(), productName, item.getPrice(productName)),pool))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

        System.out.println(stringList);
        System.out.println("test3_completableFuture done in  " + stopwatch.stop());


    }

代碼中有一個簡單的計算場景,我想查詢4家商店的iphone11售價;

華強北,益田蘋果店,香港九龍城,京東商城;

每一家的查詢大概耗時1s;

任務處理方式 耗時 優缺點說明
順序執行 4秒多 簡單,好理解
並行流 1秒多 無法定製流內置的線程池,使用簡單,改造簡單
CompletableFuture 預設線程池 2秒多 預設線程池
CompletableFuture 指定線程池 1秒多 指定了線程池,可定製性更好,相比於並行流

多個非同步任務的流水線操作

場景: 先計算價格,在拿到折扣,最後計算折扣價格;


    
    private static void test4(String productName) {

        Stopwatch stopwatch = Stopwatch.createStarted();
        final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
                .map(shop->shop.getPrice_discount(productName))
                .map(Quote::parse)
                .map(DisCount::applyDiscount)
                .collect(Collectors.toList());

        System.out.println(stringList);
        System.out.println("test4 done in  " + stopwatch.stop());


    }

    private static void test4_completableFuture(String productName) {

        Stopwatch stopwatch = Stopwatch.createStarted();
        final List<String> stringList = Stream.of(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"))
                .map(shop->CompletableFuture.supplyAsync(()->shop.getPrice_discount(productName),pool))
                .map(future->future.thenApply( Quote::parse))
                .map(future->future.thenCompose(quote -> CompletableFuture.supplyAsync(()->DisCount.applyDiscount(quote),pool)))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

        System.out.println(stringList);
        System.out.println("test4_completableFuture done in  " + stopwatch.stop());


    }

以上是有依賴關係的兩個任務的聚合,即任務2,依賴任務1的結果。使用的是thenCompose方法;

接下來如果有兩個任務可以非同步執行,最後需要依賴著兩個任務的結果計算得到最終結果,採用的是thenCombine;

//兩個不同的任務,最後需要匯聚結果,採用combine
    private static void test5(String productName) {

        Stopwatch stopwatch = Stopwatch.createStarted();


        Shop shop = new Shop("香港九龍");

      Double pricefinal =  CompletableFuture.supplyAsync(()->shop.getPrice(productName))
                .thenCombine(CompletableFuture.supplyAsync(shop::getRate),(price, rate)->price * rate).join();


        System.out.println("test4 done in  " + stopwatch.stop());


    }

completion事件

讓任務儘快結束,無需等待;
有多個服務來源,你請求多個,誰先返回,就先響應;

結果依次返回:

 //等待所有的任務執行完畢; CompletableFuture.allOf()
    public void findPriceStream(String productName){
        List<Shop> shops = Arrays.asList(new Shop("華強北"), new Shop("益田假日廣場"), new Shop("香港九龍城"), new Shop("京東商城"));
        final CompletableFuture[] completableFutureArray = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice_discount(productName), pool))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> DisCount.applyDiscount(quote), pool)))
                .map(f -> f.thenAccept(System.out::println))
                .toArray(size -> new CompletableFuture[size]);


        CompletableFuture.allOf(completableFutureArray).join();

    }

多個來源獲取最快的結果:

//有兩個獲取天氣的途徑,哪個快最後結果就取哪一個
    public static void getWeather(){
        final Object join = CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> a_weather()), CompletableFuture.supplyAsync(() -> b_weather())).join();

        System.out.println(join);
    }

    private static String b_weather() {
        DelayUtil.delay(3);
        return "bWeather";
    }

    private static String a_weather() {
        DelayUtil.delay(5);
        return "aWeather";
    }

源碼分析

可完備化的將來;CompletableFuture ;

先看簽名:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

實現了Futrue,CompletionStage介面;
這兩個介面簡單說明一下:

介面 關鍵特性
Future 直接翻譯為未來,標識把一個任務非同步執行,需要的的時候,通過get方法獲取,也可以取消cancel,此外還提供了狀態查詢方法,isDone, isCancled,實現類是FutureTask
CompletionStage 直接翻譯是完成的階段,提供了函數式編程方法

可以分為如下幾類方法

方法 說明
thenApply(Function f) 當前階段正常完成之後,返回一個新的階段,新的階段把當前階段的結果作為參數輸入;
thenConsume(Consumer c), 當前階段完成之後,結果作為參數輸入,直接消費掉,得到不返回結果的完成階段;
thenRun(Runnable action), 不接受參數,只是繼續執行任務,得到一個新的完成階段;
thenCombine(otherCompletionStage,BiFunction), 當兩個完成階段都完成的時候,執行BIFunction,返回一個新的階段;
thenAcceptBoth(OtherCompletionStage, BiConsumer) 兩個完成階段都完成之後,對兩個結果進行消費;
runAfterBoth(OtherCompletionStage,Runable) 兩個完成階段都完成之後,執行一個動作;
applyToEither(OtherCompletionStage,Function) 兩個完成階段的任何一個執行結束,進入函數操作,並返回一個新的階段
acceptEither(OtherCompletionStage,Consumer) 兩個完成階段的任何一個執行結束,消費掉,返回一個空返回值的完成階段
runAfterEither(OtherCompletionStage,Runable) 兩個完成階段的任何一個結束,執行一個動作,返回一個空返回值的完成階段
thenCompose(Function) 當前階段完成,返回值作為參數,進行函數運算,然後結果作為一個新的完成階段
exceptionally(Function) 無論當前階段是否正常完成,消費掉異常,然後返回值作為一個新的完成階段
whenComplete
handle 無論當前完成階段是否正常結束,都執行一個BIFunction的函數,並返回一個新結果作為一個新的完成階段
toCompletableFuture 轉換為ComplatableFuture

裡面的實現細節後面單獨成文章再講。

小結

  1. 執行一些比較耗時的操作,尤其是依賴一個或者多個遠程服務的操作,可以使用非同步任務改善程式的性能,加快程式的響應速度;
  2. 使用CompletableFuture你可以輕鬆的實現非同步API;
  3. CompletableFuture提供了異常管理機制,讓主線程有機會接管子任務拋出的異常;
  4. 把同步API封裝到CompletableFuture中,可以非同步得到它的結果;
  5. 如果非同步任務之間互相獨立,而他們之間的某一些結果是另外一些的輸入,可以把這些任務進行compose;
  6. 可以為CompletableFuture中的任務註冊一個回調函數,當任務執行完畢之後再進行一些其它操作;
  7. 你可以決定什麼時候結束程式的運行,是所有的CompletableFuture任務所有對象執行完畢,或者只要其中任何一個完成即可。

原創不易,轉載請註明出處。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 代理通常就是一個介於尋求方和提供方之間的中介系統。其核心思想就是客戶端(尋求方)沒有直接和提供方(真實對象)打交道,而是通過代理對象來完成提供方提供的資源或操作。 代理其實就是封裝實際服務對象的包裝器或代理人。代理可以為其包裝的對象提供附加功能,而無需改變此對象的代碼。代理模式的主要目的是為其他對象 ...
  • 無論函數傳遞的參數的可變還是不可變,只要針對參數使用賦值語句,會在函數內部修改局部變數的引用,不會影響到外部變數的引用,而如果傳遞的參數是可變類型,在函數內部使用方法修改了數據的內容,同樣會影響到外部的數據。 運行結果: 運行結果: 無論函數傳遞的參數的可變還是不可變,只要針對參數使用賦值語句,會在 ...
  • 1. 資料庫設計經驗,為什麼進行分表?分庫?一般多少數據量開始分表?分庫?分庫分表的目的?什麼是資料庫垂直拆分?水平拆分?分區等等 一:為什麼要分表 當一張表的數據達到幾百萬時,你查詢一次所花的時間會變多,如果有聯合查詢的話,有可能會死在那兒了。分表的目的就在於此,減小資料庫的負擔,縮短查詢時間。日 ...
  • 單元測試 先看一個需求 在我們工作中,我們會遇到這樣的情況,就是去確認一個函數,或者一個模塊的結果是否正確. 傳統的方法 15.2.1 傳統的方式來進行測試 在 main 函數中,調用 addUpper 函數,看看實際輸出的結果是否和預期的結果一致,如果一致, 則說明函數正確,否則函數有錯誤,然後修 ...
  • PHP-Casbin 是一個強大的、高效的開源訪問控制框架,它支持基於各種訪問控制模型的許可權管理。 Think-Casbin 是一個專為 ThinkPHP5.1 定製的 Casbin 的擴展包,使開發者更便捷的在 thinkphp 項目中使用 Casbin。 安裝 創建 thinkphp 項目(如果 ...
  • 策略模式 一、什麼是策略模式 策略模式作為一種軟體設計模式,指對象有某個行為,但是在不同的場景中,該行為有不同的實現演算法。比如每個人都要“交個人所得稅”,但是“在美國交個人所得稅”和“在中國交個人所得稅”就有不同的算稅方法。 策略模式(Strategy) ,定義了一組演算法,將每個演算法都封裝起來,並且 ...
  • 章文件操作 文件的基本介紹 文件的概念 文件,對我們並不陌生,文件是數據源(保存數據的地方)的一種,比如大家經常使用的 word 文檔,txt 文 件,excel 文件...都是文件。文件最主要的作用就是保存數據,它既可以保存一張圖片,也可以保持視頻,聲 音... 輸入流和輸出流 os.File 封 ...
  • 定義(以下是百度百科中的定義): Java介面:Java介面是一系列方法的聲明,是一些方法特征的集合,一個介面只有方法的特征沒有方法的實現,因此這些方法可以在不同的地方被不同的類實現,而這些實現可以具有不同的行為(功能)。 個人補充:Java介面中也可以有屬性,只不過必須為 靜態常量(由public ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...