Java8已經發佈7年了,不會還有人沒用過CompletableFuture吧

来源:https://www.cnblogs.com/yidengjiagou/archive/2022/06/15/16379535.html
-Advertisement-
Play Games

日常開發中,我們都會用到線程池,一般會用execute()和submit()方法提交任務。但是當你用過CompletableFuture之後,就會發現以前的線程池處理任務有多難用,功能有多簡陋,CompletableFuture又是多麼簡潔優雅。 要知道CompletableFuture已經隨著Ja ...


日常開發中,我們都會用到線程池,一般會用execute()和submit()方法提交任務。但是當你用過CompletableFuture之後,就會發現以前的線程池處理任務有多難用,功能有多簡陋,CompletableFuture又是多麼簡潔優雅。

要知道CompletableFuture已經隨著Java8發佈7年了,還沒有過它就有點說不過去了。
今天5分鐘帶你深入淺出CompletableFuture實用教程。

1. 使用線程池處理任務

/**
 * @author yideng
 * @apiNote 線程池使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        
        List<Integer> list = Arrays.asList(1, 2, 3);
        List<Future<String>> futures = new ArrayList<>();
        for (Integer key : list) {
            // 2. 提交任務
            Future<String> future = executorService.submit(() -> {
                // 睡眠一秒,模仿處理過程
                Thread.sleep(1000L);
                return "結果" + key;
            });
            futures.add(future);
        }

        // 3. 獲取結果
        for (Future<String> future : futures) {
            try {
                String result = future.get();
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        executorService.shutdown();
    }

}

輸出結果:

結果1
結果2
結果3

一般大家都會這樣使用線程池,但是有沒有思考過這樣使用有沒有什麼問題?
反正我發現兩個比較嚴重的問題:

  1. 獲取結果時,調用的future.get()方法,會阻塞當前線程,直到返回結果,大大降低性能
  2. 有一半的代碼在寫怎麼使用線程,其實我們不應該關心怎麼使用線程,更應該關註任務的處理

有沒有具體的優化方案呢?當然有了,請出來我們今天的主角CompletableFuture

2. 使用CompletableFuture重構任務處理

看一下使用CompletableFuture改造後代碼:

/**
 * @author yideng
 * @apiNote CompletableFuture使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        for (Integer key : list) {
            // 2. 提交任務
            CompletableFuture.supplyAsync(() -> {
                // 睡眠一秒,模仿處理過程
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
                return "結果" + key;
            }, executorService).whenCompleteAsync((result, exception) -> {
                // 3. 獲取結果
                System.out.println(result);
            });;
        }

        executorService.shutdown();
        // 由於whenCompleteAsync獲取結果的方法是非同步的,所以要阻塞當前線程才能輸出結果
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

輸出結果:

結果1
結果2
結果3

代碼中使用了CompletableFuture的兩個方法,
supplyAsync()方法作用是提交非同步任務,有兩個傳參,任務和自定義線程池。
whenCompleteAsync()方法作用是非同步獲取結果,也有兩個傳參,結果和異常信息。

代碼經過CompletableFuture改造後,是多麼的簡潔優雅。
提交任務也不用再關心線程池是怎麼使用了,獲取結果也不用再阻塞當前線程了。

如果你比較倔強,還想同步獲取結果,可以使用whenComplete()方法,或者單獨調用join()方法。
join()方法配合Stream流是這樣用的:

/**
 * @author yideng
 * @apiNote CompletableFuture使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        // 2. 提交任務
        List<String> results = list.stream().map(key ->
                CompletableFuture.supplyAsync(() -> {
                    // 睡眠一秒,模仿處理過程
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                    return "結果" + key;
                }, executorService))
                .map(CompletableFuture::join).collect(Collectors.toList());

        executorService.shutdown();
        // 3. 獲取結果
        System.out.println(results);

    }

}

輸出結果:

[結果1,結果2,結果3]

多麼的簡潔優雅啊!原來executorService.submit()這種使用線程池的方式,可以徹底丟掉了。

3. CompletableFuture更多妙用

3.1 等待所有任務執行完成

如果讓你實現等待所有任務線程執行完成,再進行下一步操作,你會怎麼做?
我猜你一定會使用 線程池+CountDownLatch,像下麵這樣:

/**
 * @author yideng
 * @apiNote 線程池和CountDownLatch使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (Integer key : list) {
            // 2. 提交任務
            executorService.execute(() -> {
                // 睡眠一秒,模仿處理過程
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
                System.out.println("結果" + key);
                countDownLatch.countDown();
            });
        }

        executorService.shutdown();
        // 3. 阻塞等待所有任務執行完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
        }
    }

}

輸出結果:

結果2
結果3
結果1

Low不Low?十年前可以這樣寫,Java8都已經發佈7年了,你還不會用Java8的寫法?看一下使用CompletableFuture是怎麼重構的:

/**
 * @author yideng
 * @apiNote CompletableFuture.allOf()方法使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        // 2. 提交任務,並調用join()阻塞等待所有任務執行完成
        CompletableFuture
                .allOf(
                        list.stream().map(key ->
                                CompletableFuture.runAsync(() -> {
                                    // 睡眠一秒,模仿處理過程
                                    try {
                                        Thread.sleep(1000L);
                                    } catch (InterruptedException e) {
                                    }
                                    System.out.println("結果" + key);
                                }, executorService))
                                .toArray(CompletableFuture[]::new))
                .join();
        executorService.shutdown();
    }

}

輸出結果:

結果3
結果1
結果2

代碼看著有點亂,其實邏輯很清晰。

  1. 遍歷list集合,提交CompletableFuture任務,把結果轉換成數組
  2. 再把數組放到CompletableFuture的allOf()方法裡面
  3. 最後調用join()方法阻塞等待所有任務執行完成

CompletableFuture的allOf()方法的作用就是,等待所有任務處理完成。
這樣寫是不是簡潔優雅了許多?

3.2 任何一個任務處理完成就返回

如果要實現這樣一個需求,往線程池提交一批任務,只要有其中一個任務處理完成就返回。
該怎麼做?如果你手動實現這個邏輯的話,代碼肯定複雜且低效,有了CompletableFuture就非常簡單了,只需調用anyOf()方法就行了。

/**
 * @author yideng
 * @apiNote CompletableFuture.anyOf()方法使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        long start = System.currentTimeMillis();
        // 2. 提交任務
        CompletableFuture<Object> completableFuture = CompletableFuture
                .anyOf(
                        list.stream().map(key ->
                                CompletableFuture.supplyAsync(() -> {
                                    // 睡眠一秒,模仿處理過程
                                    try {
                                        Thread.sleep(1000L);
                                    } catch (InterruptedException e) {
                                    }
                                    return "結果" + key;
                                }, executorService))
                                .toArray(CompletableFuture[]::new));
        executorService.shutdown();

        // 3. 獲取結果
        System.out.println(completableFuture.join());
    }

}

輸出結果:

結果3

一切都是那麼簡單優雅。

3.3 一個線程執行完成,交給另一個線程接著執行

有這麼一個需求:
一個線程處理完成,把處理的結果交給另一個線程繼續處理,怎麼實現?

你是不是想到了一堆工具,線程池、CountDownLatch、Semaphore、ReentrantLock、Synchronized,該怎麼進行組合使用呢?AB組合還是BC組合?

別瞎想了,你寫的肯定沒有CompletableFuture好用,看一下CompletableFuture是怎麼用的:

/**
 * @author yideng
 * @apiNote CompletableFuture線程接力處理示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        // 1. 創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 2. 提交任務,並調用join()阻塞等待任務執行完成
        String result2 = CompletableFuture.supplyAsync(() -> {
            // 睡眠一秒,模仿處理過程
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            return "結果1";
        }, executorService).thenApplyAsync(result1 -> {
            // 睡眠一秒,模仿處理過程
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            return result1 + "結果2";
        }, executorService).join();

        executorService.shutdown();
        // 3. 獲取結果
        System.out.println(result2);
    }

}

輸出結果:

結果1結果2

代碼主要用到了CompletableFuture的thenApplyAsync()方法,作用就是非同步處理上一個線程的結果。

是不是太方便了?

這麼好用的CompletableFuture還有沒有其他功能?當然有。

4. CompletableFuture常用API

4.1 CompletableFuture常用API說明

  1. 提交任務
    supplyAsync
    runAsync

  2. 接力處理

    thenRun thenRunAsync
    thenAccept thenAcceptAsync
    thenApply thenApplyAsync
    handle handleAsync
    applyToEither applyToEitherAsync
    acceptEither acceptEitherAsync
    runAfterEither runAfterEitherAsync
    thenCombine thenCombineAsync
    thenAcceptBoth thenAcceptBothAsync

API太多,有點眼花繚亂,很容易分類。
帶run的方法,無入參,無返回值。
帶accept的方法,有入參,無返回值。
帶supply的方法,無入參,有返回值。
帶apply的方法,有入參,有返回值。
帶handle的方法,有入參,有返回值,並且帶異常處理。
以Async結尾的方法,都是非同步的,否則是同步的。
以Either結尾的方法,只需完成任意一個。
以Both/Combine結尾的方法,必須所有都完成。

  1. 獲取結果
    join 阻塞等待,不會拋異常
    get 阻塞等待,會拋異常
    complete(T value) 不阻塞,如果任務已完成,返回處理結果。如果沒完成,則返回傳參value。
    completeExceptionally(Throwable ex) 不阻塞,如果任務已完成,返回處理結果。如果沒完成,拋異常。

4. CompletableFuture常用API使用示例

用最常見的煮飯來舉例:

4.1 then、handle方法使用示例

/**
 * @author yideng
 * @apiNote then、handle方法使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("1. 開始淘米");
            return "2. 淘米完成";
        }).thenApplyAsync(result -> {
            System.out.println(result);
            System.out.println("3. 開始煮飯");
            // 生成一個1~10的隨機數
            if (RandomUtils.nextInt(1, 10) > 5) {
                throw new RuntimeException("4. 電飯煲壞了,煮不了");
            }
            return "4. 煮飯完成";
        }).handleAsync((result, exception) -> {
            if (exception != null) {
                System.out.println(exception.getMessage());
                return "5. 今天沒飯吃";
            } else {
                System.out.println(result);
                return "5. 開始吃飯";
            }
        });

        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

輸出結果可能是:

1. 開始淘米
2. 淘米完成
3. 開始煮飯
4. 煮飯完成
5. 開始吃飯

也可能是:

1. 開始淘米
2. 淘米完成
3. 開始煮飯
java.lang.RuntimeException: 4. 電飯煲壞了,煮不了
5. 今天沒飯吃

4.2 complete方法使用示例

/**
 * @author yideng
 * @apiNote complete使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "飯做好了";
        });
        
        //try {
        //    Thread.sleep(1L);
        //} catch (InterruptedException e) {
        //}

        completableFuture.complete("飯還沒做好,我點外賣了");
        System.out.println(completableFuture.join());
    }

}

輸出結果:

飯還沒做好,我點外賣了

如果把註釋的sleep()方法放開,輸出結果就是:

飯做好了

4.3 either方法使用示例

/**
 * @author yideng
 * @apiNote either方法使用示例
 */
public class ThreadDemo {

    public static void main(String[] args) {
        CompletableFuture<String> meal = CompletableFuture.supplyAsync(() -> {
            return "飯做好了";
        });
        CompletableFuture<String> outMeal = CompletableFuture.supplyAsync(() -> {
            return "外賣到了";
        });

        // 飯先做好,就吃飯。外賣先到,就吃外賣。就是這麼任性。
        CompletableFuture<String> completableFuture = meal.applyToEither(outMeal, myMeal -> {
            return myMeal;
        });

        System.out.println(completableFuture.join());
    }

}

輸出結果可能是:

飯做好了

也可能是:

外賣到了

學會了嗎?開發中趕快用起來吧!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 簡介 在 JavaScript 中,迭代器是一個對象,它定義一個序列,併在終止時可能返回一個返回值。 更具體地說,迭代器是通過使用 next() 方法實現迭代器協議的任何一個對象,該方法返回具有兩個屬性的對象: value,這是序列中的 next 值;和 done ,如果已經迭代到序列中的最後一個值 ...
  • 1. Express框架是什麼 1.1 Express是一個基於Node平臺的web應用開發框架,它提供了一系列的強大特性,幫助你創建各種Web應用。我們可以使用 npm install express 命令進行下載。 1.2 Express初體驗 // 引入express框架 const expr ...
  • 遞歸查找文件 引言 或許是文件太多,想找某個文件又忘記放哪了;又或者是項目改造,需要將外部調用介面進行改造,項目太多,又無法排查。那麼怎麼快速找到自己想要的內容就是一件值得思考的事情了。 根據特定內容尋找文件位置 package com.lizi.globalexception.Utils; imp ...
  • 大家好,本文我將繼續來剖析SpringCloud中負載均衡組件Ribbon的源碼。本來我是打算接著OpenFeign動態代理生成文章直接講Feign是如何整合Ribbon的,但是文章寫了一半發現,如果不把Ribbon好好講清楚,那麼有些Ribbon的細節理解起來就很困難,所以我還是打算單獨寫一篇文章 ...
  • phpcurl函數類模擬Curl get post header refer攜帶Cookie模擬訪問來源Refer模擬UseaAgent ...
  • 一個工作5年的粉絲找到我,他說參加美團面試,遇到一個基礎題沒回答上來。 這個問題是:“資料庫連接池有什麼用?以及它有哪些關鍵參數”? 我說,這個問題都不知道,那你項目裡面的連接池配置怎麼設置的? 你們猜他怎麼回答。懂得懂得啊。 好的,關於這個問題,我們來看看普通人和高手的回答。 普通人: 資料庫連接 ...
  • Principle of token bucket 隨著互聯網的發展,在處理流量的方法也不僅僅為 first-come,first-served,而在共用網路中實現流量管理的基本機制就是排隊。而公平演算法則是實現在優先順序隊列中基於哪些策略來排隊的”公平隊列“。Token Bucket 則是為公平排隊提 ...
  • 1. 指數增長模型 設第今年的人口為 \(x_0\),年增長率為 \(r\),預測 \(k\) 年後的人口為 \(x_k\),則有 \[ x_k = x_0(1+r)^k \tag{1} \] 這個模型的前提是年增長率 \(r\) 在 \(k\) 年內保持不變. 利用 (1) 式可以根據人口估計年增 ...
一周排行
    -Advertisement-
    Play Games
  • C#TMS系統代碼-基礎頁面BaseCity學習 本人純新手,剛進公司跟領導報道,我說我是java全棧,他問我會不會C#,我說大學學過,他說這個TMS系統就給你來管了。外包已經把代碼給我了,這幾天先把增刪改查的代碼背一下,說不定後面就要趕鴨子上架了 Service頁面 //using => impo ...
  • 委托與事件 委托 委托的定義 委托是C#中的一種類型,用於存儲對方法的引用。它允許將方法作為參數傳遞給其他方法,實現回調、事件處理和動態調用等功能。通俗來講,就是委托包含方法的記憶體地址,方法匹配與委托相同的簽名,因此通過使用正確的參數類型來調用方法。 委托的特性 引用方法:委托允許存儲對方法的引用, ...
  • 前言 這幾天閑來沒事看看ABP vNext的文檔和源碼,關於關於依賴註入(屬性註入)這塊兒產生了興趣。 我們都知道。Volo.ABP 依賴註入容器使用了第三方組件Autofac實現的。有三種註入方式,構造函數註入和方法註入和屬性註入。 ABP的屬性註入原則參考如下: 這時候我就開始疑惑了,因為我知道 ...
  • C#TMS系統代碼-業務頁面ShippingNotice學習 學一個業務頁面,ok,領導開完會就被裁掉了,很突然啊,他收拾東西的時候我還以為他要旅游提前請假了,還在尋思為什麼回家連自己買的幾箱飲料都要叫跑腿帶走,怕被偷嗎?還好我在他開會之前拿了兩瓶芬達 感覺感覺前面的BaseCity差不太多,這邊的 ...
  • 概述:在C#中,通過`Expression`類、`AndAlso`和`OrElse`方法可組合兩個`Expression<Func<T, bool>>`,實現多條件動態查詢。通過創建表達式樹,可輕鬆構建複雜的查詢條件。 在C#中,可以使用AndAlso和OrElse方法組合兩個Expression< ...
  • 閑來無聊在我的Biwen.QuickApi中實現一下極簡的事件匯流排,其實代碼還是蠻簡單的,對於初學者可能有些幫助 就貼出來,有什麼不足的地方也歡迎板磚交流~ 首先定義一個事件約定的空介面 public interface IEvent{} 然後定義事件訂閱者介面 public interface I ...
  • 1. 案例 成某三甲醫預約系統, 該項目在2024年初進行上線測試,在正常運行了兩天後,業務系統報錯:The connection pool has been exhausted, either raise MaxPoolSize (currently 800) or Timeout (curren ...
  • 背景 我們有些工具在 Web 版中已經有了很好的實踐,而在 WPF 中重新開發也是一種費時費力的操作,那麼直接集成則是最省事省力的方法了。 思路解釋 為什麼要使用 WPF?莫問為什麼,老 C# 開發的堅持,另外因為 Windows 上已經裝了 Webview2/edge 整體打包比 electron ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...