Java8已經發佈7年了,不會還有人沒用過CompletableFuture吧

来源:https://www.cnblogs.com/yidengjiagou/archive/2022/06/15/16379535.html
-Advertisement-
Play Games

日常開發中,我們都會用到線程池,一般會用execute()和submit()方法提交任務。但是當你用過CompletableFuture之後,就會發現以前的線程池處理任務有多難用,功能有多簡陋,CompletableFuture又是多麼簡潔優雅。 要知道CompletableFuture已經隨著Ja ...


日常開發中,我們都會用到線程池,一般會用execute()和submit()方法提交任務。但是當你用過CompletableFuture之後,就會發現以前的線程池處理任務有多難用,功能有多簡陋,CompletableFuture又是多麼簡潔優雅。

要知道CompletableFuture已經隨著Java8發佈7年了,還沒有過它就有點說不過去了。
今天5分鐘帶你深入淺出CompletableFuture實用教程。

1. 使用線程池處理任務

/**
 * @author yideng
 * @apiNote 線程池使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        
        List<Integer> list = Arrays.asList(1, 2, 3);
        List<Future<String>> futures = new ArrayList<>();
        for (Integer key : list) {
            // 2. 提交任務
            Future<String> future = executorService.submit(() -> {
                // 睡眠一秒,模仿處理過程
                Thread.sleep(1000L);
                return "結果" + key;
            });
            futures.add(future);
        }

        // 3. 獲取結果
        for (Future<String> future : futures) {
            try {
                String result = future.get();
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        executorService.shutdown();
    }

}

輸出結果:

結果1
結果2
結果3

一般大家都會這樣使用線程池,但是有沒有思考過這樣使用有沒有什麼問題?
反正我發現兩個比較嚴重的問題:

  1. 獲取結果時,調用的future.get()方法,會阻塞當前線程,直到返回結果,大大降低性能
  2. 有一半的代碼在寫怎麼使用線程,其實我們不應該關心怎麼使用線程,更應該關註任務的處理

有沒有具體的優化方案呢?當然有了,請出來我們今天的主角CompletableFuture

2. 使用CompletableFuture重構任務處理

看一下使用CompletableFuture改造後代碼:

/**
 * @author yideng
 * @apiNote CompletableFuture使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        for (Integer key : list) {
            // 2. 提交任務
            CompletableFuture.supplyAsync(() -> {
                // 睡眠一秒,模仿處理過程
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
                return "結果" + key;
            }, executorService).whenCompleteAsync((result, exception) -> {
                // 3. 獲取結果
                System.out.println(result);
            });;
        }

        executorService.shutdown();
        // 由於whenCompleteAsync獲取結果的方法是非同步的,所以要阻塞當前線程才能輸出結果
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

輸出結果:

結果1
結果2
結果3

代碼中使用了CompletableFuture的兩個方法,
supplyAsync()方法作用是提交非同步任務,有兩個傳參,任務和自定義線程池。
whenCompleteAsync()方法作用是非同步獲取結果,也有兩個傳參,結果和異常信息。

代碼經過CompletableFuture改造後,是多麼的簡潔優雅。
提交任務也不用再關心線程池是怎麼使用了,獲取結果也不用再阻塞當前線程了。

如果你比較倔強,還想同步獲取結果,可以使用whenComplete()方法,或者單獨調用join()方法。
join()方法配合Stream流是這樣用的:

/**
 * @author yideng
 * @apiNote CompletableFuture使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        // 2. 提交任務
        List<String> results = list.stream().map(key ->
                CompletableFuture.supplyAsync(() -> {
                    // 睡眠一秒,模仿處理過程
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                    return "結果" + key;
                }, executorService))
                .map(CompletableFuture::join).collect(Collectors.toList());

        executorService.shutdown();
        // 3. 獲取結果
        System.out.println(results);

    }

}

輸出結果:

[結果1,結果2,結果3]

多麼的簡潔優雅啊!原來executorService.submit()這種使用線程池的方式,可以徹底丟掉了。

3. CompletableFuture更多妙用

3.1 等待所有任務執行完成

如果讓你實現等待所有任務線程執行完成,再進行下一步操作,你會怎麼做?
我猜你一定會使用 線程池+CountDownLatch,像下麵這樣:

/**
 * @author yideng
 * @apiNote 線程池和CountDownLatch使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (Integer key : list) {
            // 2. 提交任務
            executorService.execute(() -> {
                // 睡眠一秒,模仿處理過程
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
                System.out.println("結果" + key);
                countDownLatch.countDown();
            });
        }

        executorService.shutdown();
        // 3. 阻塞等待所有任務執行完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
        }
    }

}

輸出結果:

結果2
結果3
結果1

Low不Low?十年前可以這樣寫,Java8都已經發佈7年了,你還不會用Java8的寫法?看一下使用CompletableFuture是怎麼重構的:

/**
 * @author yideng
 * @apiNote CompletableFuture.allOf()方法使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        // 2. 提交任務,並調用join()阻塞等待所有任務執行完成
        CompletableFuture
                .allOf(
                        list.stream().map(key ->
                                CompletableFuture.runAsync(() -> {
                                    // 睡眠一秒,模仿處理過程
                                    try {
                                        Thread.sleep(1000L);
                                    } catch (InterruptedException e) {
                                    }
                                    System.out.println("結果" + key);
                                }, executorService))
                                .toArray(CompletableFuture[]::new))
                .join();
        executorService.shutdown();
    }

}

輸出結果:

結果3
結果1
結果2

代碼看著有點亂,其實邏輯很清晰。

  1. 遍歷list集合,提交CompletableFuture任務,把結果轉換成數組
  2. 再把數組放到CompletableFuture的allOf()方法裡面
  3. 最後調用join()方法阻塞等待所有任務執行完成

CompletableFuture的allOf()方法的作用就是,等待所有任務處理完成。
這樣寫是不是簡潔優雅了許多?

3.2 任何一個任務處理完成就返回

如果要實現這樣一個需求,往線程池提交一批任務,只要有其中一個任務處理完成就返回。
該怎麼做?如果你手動實現這個邏輯的話,代碼肯定複雜且低效,有了CompletableFuture就非常簡單了,只需調用anyOf()方法就行了。

/**
 * @author yideng
 * @apiNote CompletableFuture.anyOf()方法使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        long start = System.currentTimeMillis();
        // 2. 提交任務
        CompletableFuture<Object> completableFuture = CompletableFuture
                .anyOf(
                        list.stream().map(key ->
                                CompletableFuture.supplyAsync(() -> {
                                    // 睡眠一秒,模仿處理過程
                                    try {
                                        Thread.sleep(1000L);
                                    } catch (InterruptedException e) {
                                    }
                                    return "結果" + key;
                                }, executorService))
                                .toArray(CompletableFuture[]::new));
        executorService.shutdown();

        // 3. 獲取結果
        System.out.println(completableFuture.join());
    }

}

輸出結果:

結果3

一切都是那麼簡單優雅。

3.3 一個線程執行完成,交給另一個線程接著執行

有這麼一個需求:
一個線程處理完成,把處理的結果交給另一個線程繼續處理,怎麼實現?

你是不是想到了一堆工具,線程池、CountDownLatch、Semaphore、ReentrantLock、Synchronized,該怎麼進行組合使用呢?AB組合還是BC組合?

別瞎想了,你寫的肯定沒有CompletableFuture好用,看一下CompletableFuture是怎麼用的:

/**
 * @author yideng
 * @apiNote CompletableFuture線程接力處理示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 2. 提交任務,並調用join()阻塞等待任務執行完成
        String result2 = CompletableFuture.supplyAsync(() -> {
            // 睡眠一秒,模仿處理過程
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            return "結果1";
        }, executorService).thenApplyAsync(result1 -> {
            // 睡眠一秒,模仿處理過程
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            return result1 + "結果2";
        }, executorService).join();

        executorService.shutdown();
        // 3. 獲取結果
        System.out.println(result2);
    }

}

輸出結果:

結果1結果2

代碼主要用到了CompletableFuture的thenApplyAsync()方法,作用就是非同步處理上一個線程的結果。

是不是太方便了?

這麼好用的CompletableFuture還有沒有其他功能?當然有。

4. CompletableFuture常用API

4.1 CompletableFuture常用API說明

  1. 提交任務
    supplyAsync
    runAsync

  2. 接力處理

    thenRun thenRunAsync
    thenAccept thenAcceptAsync
    thenApply thenApplyAsync
    handle handleAsync
    applyToEither applyToEitherAsync
    acceptEither acceptEitherAsync
    runAfterEither runAfterEitherAsync
    thenCombine thenCombineAsync
    thenAcceptBoth thenAcceptBothAsync

API太多,有點眼花繚亂,很容易分類。
帶run的方法,無入參,無返回值。
帶accept的方法,有入參,無返回值。
帶supply的方法,無入參,有返回值。
帶apply的方法,有入參,有返回值。
帶handle的方法,有入參,有返回值,並且帶異常處理。
以Async結尾的方法,都是非同步的,否則是同步的。
以Either結尾的方法,只需完成任意一個。
以Both/Combine結尾的方法,必須所有都完成。

  1. 獲取結果
    join 阻塞等待,不會拋異常
    get 阻塞等待,會拋異常
    complete(T value) 不阻塞,如果任務已完成,返回處理結果。如果沒完成,則返回傳參value。
    completeExceptionally(Throwable ex) 不阻塞,如果任務已完成,返回處理結果。如果沒完成,拋異常。

4. CompletableFuture常用API使用示例

用最常見的煮飯來舉例:

4.1 then、handle方法使用示例

/**
 * @author yideng
 * @apiNote then、handle方法使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("1. 開始淘米");
            return "2. 淘米完成";
        }).thenApplyAsync(result -> {
            System.out.println(result);
            System.out.println("3. 開始煮飯");
            // 生成一個1~10的隨機數
            if (RandomUtils.nextInt(1, 10) > 5) {
                throw new RuntimeException("4. 電飯煲壞了,煮不了");
            }
            return "4. 煮飯完成";
        }).handleAsync((result, exception) -> {
            if (exception != null) {
                System.out.println(exception.getMessage());
                return "5. 今天沒飯吃";
            } else {
                System.out.println(result);
                return "5. 開始吃飯";
            }
        });

        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

輸出結果可能是:

1. 開始淘米
2. 淘米完成
3. 開始煮飯
4. 煮飯完成
5. 開始吃飯

也可能是:

1. 開始淘米
2. 淘米完成
3. 開始煮飯
java.lang.RuntimeException: 4. 電飯煲壞了,煮不了
5. 今天沒飯吃

4.2 complete方法使用示例

/**
 * @author yideng
 * @apiNote complete使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "飯做好了";
        });
        
        //try {
        //    Thread.sleep(1L);
        //} catch (InterruptedException e) {
        //}

        completableFuture.complete("飯還沒做好,我點外賣了");
        System.out.println(completableFuture.join());
    }

}

輸出結果:

飯還沒做好,我點外賣了

如果把註釋的sleep()方法放開,輸出結果就是:

飯做好了

4.3 either方法使用示例

/**
 * @author yideng
 * @apiNote either方法使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        CompletableFuture<String> meal = CompletableFuture.supplyAsync(() -> {
            return "飯做好了";
        });
        CompletableFuture<String> outMeal = CompletableFuture.supplyAsync(() -> {
            return "外賣到了";
        });

        // 飯先做好,就吃飯。外賣先到,就吃外賣。就是這麼任性。
        CompletableFuture<String> completableFuture = meal.applyToEither(outMeal, myMeal -> {
            return myMeal;
        });

        System.out.println(completableFuture.join());
    }

}

輸出結果可能是:

飯做好了

也可能是:

外賣到了

學會了嗎?開發中趕快用起來吧!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 簡介 在 JavaScript 中,迭代器是一個對象,它定義一個序列,併在終止時可能返回一個返回值。 更具體地說,迭代器是通過使用 next() 方法實現迭代器協議的任何一個對象,該方法返回具有兩個屬性的對象: value,這是序列中的 next 值;和 done ,如果已經迭代到序列中的最後一個值 ...
  • 1. Express框架是什麼 1.1 Express是一個基於Node平臺的web應用開發框架,它提供了一系列的強大特性,幫助你創建各種Web應用。我們可以使用 npm install express 命令進行下載。 1.2 Express初體驗 // 引入express框架 const expr ...
  • 遞歸查找文件 引言 或許是文件太多,想找某個文件又忘記放哪了;又或者是項目改造,需要將外部調用介面進行改造,項目太多,又無法排查。那麼怎麼快速找到自己想要的內容就是一件值得思考的事情了。 根據特定內容尋找文件位置 package com.lizi.globalexception.Utils; imp ...
  • 大家好,本文我將繼續來剖析SpringCloud中負載均衡組件Ribbon的源碼。本來我是打算接著OpenFeign動態代理生成文章直接講Feign是如何整合Ribbon的,但是文章寫了一半發現,如果不把Ribbon好好講清楚,那麼有些Ribbon的細節理解起來就很困難,所以我還是打算單獨寫一篇文章 ...
  • phpcurl函數類模擬Curl get post header refer攜帶Cookie模擬訪問來源Refer模擬UseaAgent ...
  • 一個工作5年的粉絲找到我,他說參加美團面試,遇到一個基礎題沒回答上來。 這個問題是:“資料庫連接池有什麼用?以及它有哪些關鍵參數”? 我說,這個問題都不知道,那你項目裡面的連接池配置怎麼設置的? 你們猜他怎麼回答。懂得懂得啊。 好的,關於這個問題,我們來看看普通人和高手的回答。 普通人: 資料庫連接 ...
  • Principle of token bucket 隨著互聯網的發展,在處理流量的方法也不僅僅為 first-come,first-served,而在共用網路中實現流量管理的基本機制就是排隊。而公平演算法則是實現在優先順序隊列中基於哪些策略來排隊的”公平隊列“。Token Bucket 則是為公平排隊提 ...
  • 1. 指數增長模型 設第今年的人口為 \(x_0\),年增長率為 \(r\),預測 \(k\) 年後的人口為 \(x_k\),則有 \[ x_k = x_0(1+r)^k \tag{1} \] 這個模型的前提是年增長率 \(r\) 在 \(k\) 年內保持不變. 利用 (1) 式可以根據人口估計年增 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...