併發編程-CompletableFuture解析

来源:https://www.cnblogs.com/jingdongkeji/archive/2023/07/28/17587006.html
-Advertisement-
Play Games

CompletableFuture對象是JDK1.8版本新引入的類,這個類實現了兩個介面,一個是Future介面,一個是CompletionStage介面。 ...


1、CompletableFuture介紹

CompletableFuture對象是JDK1.8版本新引入的類,這個類實現了兩個介面,一個是Future介面,一個是CompletionStage介面。

CompletionStage介面是JDK1.8版本提供的介面,用於非同步執行中的階段處理,CompletionStage定義了一組介面用於在一個階段執行結束之後,要麼繼續執行下一個階段,要麼對結果進行轉換產生新的結果等,一般來說要執行下一個階段都需要上一個階段正常完成,這個類也提供了對異常結果的處理介面

2、CompletableFuture的API

2.1 提交任務

在CompletableFuture中提交任務有以下幾種方式:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

這四個方法都是用來提交任務的,不同的是supplyAsync提交的任務有返回值,runAsync提交的任務沒有返回值。兩個介面都有一個重載的方法,第二個入參為指定的線程池,如果不指定,則預設使用ForkJoinPool.commonPool()線程池。在使用的過程中儘量根據不同的業務來指定不同的線程池,方便對不同線程池進行監控,同時避免業務共用線程池相互影響。

2.2 結果轉換

2.2.1 thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

thenApply這一組函數入參是Function,意思是將上一個CompletableFuture執行結果作為入參,再次進行轉換或者計算,重新返回一個新的值。

2.2.2 handle

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

handle這一組函數入參是BiFunction,該函數式介面有兩個入參一個返回值,意思是處理上一個CompletableFuture的處理結果,同時如果有異常,需要手動處理異常。

2.2.3 thenRun

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

thenRun這一組函數入參是Runnable函數式介面,該介面無需入參和出參,這一組函數是在上一個CompletableFuture任務執行完成後,在執行另外一個介面,不需要上一個任務的結果,也不需要返回值,只需要在上一個任務執行完成後執行即可。

2.2.4 thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

thenAccept這一組函數的入參是Consumer,該函數式介面有一個入參,沒有返回值,所以這一組介面的意思是處理上一個CompletableFuture的處理結果,但是不返回結果。

2.2.5 thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

thenAcceptBoth這一組函數入參包括CompletionStage以及BiConsumer,CompletionStage是JDK1.8新增的介面,在JDK中只有一個實現類:CompletableFuture,所以第一個入參就是CompletableFuture,這一組函數是用來接受兩個CompletableFuture的返回值,並將其組合到一起。BiConsumer這個函數式介面有兩個入參,並且沒有返回值,BiConsumer的第一個入參就是調用方CompletableFuture的執行結果,第二個入參就是thenAcceptBoth介面入參的CompletableFuture的執行結果。所以這一組函數意思是將兩個CompletableFuture執行結果合併到一起。

2.2.6 thenCombine

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

thenCombine這一組函數和thenAcceptBoth類似,入參都包含一個CompletionStage,也就是CompletableFuture對象,意思也是組合兩個CompletableFuture的執行結果,不同的是thenCombine的第二個入參為BiFunction,該函數式介面有兩個入參,同時有一個返回值。所以與thenAcceptBoth不同的是,thenCombine將兩個任務結果合併後會返回一個全新的值作為出參。

2.2.7 thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

thenCompose這一組函數意思是將調用方的執行結果作為Function函數的入參,同時返回一個新的CompletableFuture對象。

2.3 回調方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

whenComplete方法意思是當上一個CompletableFuture對象任務執行完成後執行該方法。BiConsumer函數式介面有兩個入參沒有返回值,這兩個入參第一個是CompletableFuture任務的執行結果,第二個是異常信息。表示處理上一個任務的結果,如果有異常,則需要手動處理異常,與handle方法的區別在於,handle方法的BiFunction是有返回值的,而BiConsumer是沒有返回值的。

以上方法都有一個帶有Async的方法,帶有Async的方法表示是非同步執行的,會將該任務放到線程池中執行,同時該方法會有一個重載的方法,最後一個參數為Executor,表示非同步執行可以指定線程池執行。為了方便進行控制,最好在使用CompletableFuture時手動指定我們的線程池。

2.4 異常處理

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

exceptionally是用來處理異常的,當任務拋出異常後,可以通過exceptionally來進行處理,也可以選擇使用handle來進行處理,不過兩者有些不同,hand是用來處理上一個任務的結果,如果有異常情況,就處理異常。而exceptionally可以放在CompletableFuture處理的最後,作為兜底邏輯來處理未知異常。

2.5 獲取結果

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf是需要入參中所有的CompletableFuture任務執行完成,才會進行下一步;

anyOf是入參中任何一個CompletableFuture任務執行完成都可以執行下一步。

public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent)
public T join()

get方法一個是不帶超時時間的,一個是帶有超時時間的。

getNow方法則是立即返回結果,如果還沒有結果,則返回預設值,也就是該方法的入參。

join方法是不帶超時時間的等待任務完成。

3、CompletableFuture原理

join方法同樣表示獲取結果,但是join與get方法有什麼區別呢。

public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        Object r;
        long nanos = unit.toNanos(timeout);
        return reportGet((r = result) == null ? timedGet(nanos) : r);
}

public T getNow(T valueIfAbsent) {
        Object r;
        return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

以上是CompletableFuture類中兩個方法的代碼,可以看到兩個方法幾乎一樣。區別在於reportJoin/reportGet,waitingGet方法是一致的,只不過參數不一樣,我們在看下reportGet與reportJoin方法。

private static <T> T reportGet(Object r)
        throws InterruptedException, ExecutionException {
        if (r == null) // by convention below, null means interrupted
            throw new InterruptedException();
        if (r instanceof AltResult) {
            Throwable x, cause;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if ((x instanceof CompletionException) &&
                (cause = x.getCause()) != null)
                x = cause;
            throw new ExecutionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }

private static <T> T reportJoin(Object r) {
        if (r instanceof AltResult) {
            Throwable x;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if (x instanceof CompletionException)
                throw (CompletionException)x;
            throw new CompletionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }

可以看到這兩個方法很相近,reportGet方法判斷了r對象是否為空,並拋出了中斷異常,而reportJoin方法沒有判斷,同時reportJoin拋出的都是運行時異常,所以join方法也是無需手動捕獲異常的。

我們在看下waitingGet方法

private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = SPINS;
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        postComplete();
        return r;
    }

該waitingGet方法是通過while的方式迴圈判斷是否任務已經完成並產生結果,如果結果為空,則會一直在這裡迴圈,這裡需要註意的是在這裡初始化了一下spins=-1,當第一次進入while迴圈的時候,spins是-1,這時會將spins賦值為一個常量,該常量為SPINS。

private static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
                                      1 << 8 : 0);

這裡判斷可用CPU數是否大於1,如果大於1,則該常量為 1<< 8,也就是256,否則該常量為0。

第二次進入while迴圈的時候,spins是256大於0,這裡做了減一的操作,下次進入while迴圈,如果還沒有結果,依然是大於0繼續做減一的操作,此處用來做短時間的自旋等待結果,只有當spins等於0,後續會進入正常流程判斷。

我們在看下timedGet方法的源碼

private Object timedGet(long nanos) throws TimeoutException {
        if (Thread.interrupted())
            return null;
        if (nanos <= 0L)
            throw new TimeoutException();
        long d = System.nanoTime() + nanos;
        Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
        boolean queued = false;
        Object r;
        // We intentionally don't spin here (as waitingGet does) because
        // the call to nanoTime() above acts much like a spin.
        while ((r = result) == null) {
            if (!queued)
                queued = tryPushStack(q);
            else if (q.interruptControl < 0 || q.nanos <= 0L) {
                q.thread = null;
                cleanStack();
                if (q.interruptControl < 0)
                    return null;
                throw new TimeoutException();
            }
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q.interruptControl < 0)
            r = null;
        q.thread = null;
        postComplete();
        return r;
    }

timedGet方法依然是通過while迴圈的方式來判斷是否已經完成,timedGet方法入參為一個納秒值,並通過該值計算出一個deadline截止時間,當while迴圈還未獲取到任務結果且已經達到截止時間,則拋出一個TimeoutException異常。

4、CompletableFuture實現多線程任務

這裡我們通過CompletableFuture來實現一個多線程處理非同步任務的例子。

這裡我們創建10個任務提交到我們指定的線程池中執行,並等待這10個任務全部執行完畢。

每個任務的執行流程為第一次先執行加法,第二次執行乘法,如果發生異常則返回預設值,當10個任務執行完成後依次列印每個任務的結果。

public void demo() throws InterruptedException, ExecutionException, TimeoutException {
        // 1、自定義線程池
        ExecutorService executorService = new ThreadPoolExecutor(5, 10,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100));

        // 2、集合保存future對象
        List<CompletableFuture<Integer>> futures = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            CompletableFuture<Integer> future = CompletableFuture
                    // 提交任務到指定線程池
                    .supplyAsync(() -> this.addValue(finalI), executorService)
                    // 第一個任務執行結果在此處進行處理
                    .thenApplyAsync(k -> this.plusValue(finalI, k), executorService)
                    // 任務執行異常時處理異常並返回預設值
                    .exceptionally(e -> this.defaultValue(finalI, e));
            // future對象添加到集合中
            futures.add(future);
        }

        // 3、等待所有任務執行完成,此處最好加超時時間
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
        for (CompletableFuture<Integer> future : futures) {
            Integer num = future.get();
            System.out.println("任務執行結果為:" + num);
        }
        System.out.println("任務全部執行完成!");
    }

    private Integer addValue(Integer index) {
        System.out.println("第" + index + "個任務第一次執行");
        if (index == 3) {
            int value = index / 0;
        }
        return index + 3;
    }

    private Integer plusValue(Integer index, Integer num) {
        System.out.println("第" + index + "個任務第二次執行,上次執行結果:" + num);
        return num * 10;
    }

    private Integer defaultValue(Integer index, Throwable e) {
        System.out.println("第" + index + "個任務執行異常!" + e.getMessage());
        e.printStackTrace();
        return 10;
    }

作者:京東物流 丁冬

來源:京東雲開發者社區 自猿其說Tech


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.python解釋器安裝 1> 需要到官網下載安裝python解釋器 步驟1:官網鏈接:https://www.python.org/downloads/,選擇Downloads下Windows版本 步驟2:版本較多,選擇適合的Windows版本進行下載,如下: 2> 解釋器安裝 步驟4:下載好後 ...
  • 理解二叉樹深度定義,思路剖析後可以使用深度優先搜索,這麼高大上的名字背後使用的是遞歸函數,遞歸三要素還可以回憶到嗎?還是已經拋到九霄雲外了! ...
  • **原文鏈接:** [Go 語言 select 都能做什麼?](https://mp.weixin.qq.com/s/YyyMzYxMi8I4HEaxzy4c7g) 在 Go 語言中,`select` 是一個關鍵字,用於監聽和 `channel` 有關的 IO 操作。 通過 `select` 語句, ...
  • [TOC] # 前言 做嵌入式的上位機開發需要要用到Qt的,Qt是一個開源、跨平臺的程式和UI開發框架。我們使用Qt可以用Python或者C++進行開發,這裡我使用的全部都是C++,不涉及到Python。 # 一、Qt安裝 要學習Qt前先得學習一下如何安裝Qt,這裡安裝的是QtIDE,是Qt的集成開 ...
  • `json.load()`和`json.loads()`都是Python標準庫`json`模塊中用於處理JSON數據的方法,二者的作用都是將JSON數據轉換為Python數據類型,它們之間的區別如下: ### 1. `json.load()`是從文件中讀取JSON數據 `json.load()`用於 ...
  • # 批處理 - **基本介紹:** 1. 當需要成批插入或者更新記錄時。可以採用Java的批量更新機制,這一機制允許多條語句一次性提交給資料庫批量處理。通常情況下比單獨提交處理更有效率。 2. JDBC的批量處理語句包括下麵方法: - addBatch():添加需要批量處理的SQL語句或參數; - ...
  • 公眾號服務號每個月只能群發推送四次文章,我們可以使用模板消息為公眾號粉絲推送信息 下麵是使用golang實現的模板消息發送類庫封裝,輕鬆實現模板消息發送 wechat.go package lib import ( "github.com/silenceper/wechat/v2" "github. ...
  • ## 教程簡介 Drupal是使用PHP語言編寫的開源內容管理框架(CMF),它由內容管理系統(CMS)和PHP開發框架(Framework)共同構成,在GPL2.0及更新協議下發佈。連續多年榮獲全球最佳CMS大獎,是基於PHP語言最著名的WEB應用程式。截止2011年底,共有13,802位WEB專 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...