【併發編程】Future模式添加Callback及Promise 模式

来源:https://www.cnblogs.com/weknow619/archive/2019/05/16/10873519.html
-Advertisement-
Play Games

【併發編程】Future模式添加Callback及Promise 模式 ...


Future

Future是Java5增加的類,它用來描述一個非同步計算的結果。你可以使用 isDone 方法檢查計算是否完成,或者使用 get 方法阻塞住調用線程,直到計算完成返回結果。你也可以使用 cancel 方法停止任務的執行。下麵來一個慄子:

public class FutureDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(10);
        Future<Integer> f = es.submit(() ->{
            Thread.sleep(10000);
            // 結果
            return 100;
        });

        // do something

        Integer result = f.get();
        System.out.println(result);

//        while (f.isDone()) {
//            System.out.println(result);
//        }
    }
}

在這個例子中,我們往線程池中提交了一個任務並立即返回了一個Future對象,接著可以做一些其他操作,最後利用它的 get 方法阻塞等待結果或 isDone 方法輪詢等待結果(關於Future的原理可以參考之前的文章:【併發編程】Future模式及JDK中的實現

雖然這些方法提供了非同步執行任務的能力,但是對於結果的獲取卻還是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。

阻塞的方式顯然和我們的非同步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時的得到計算結果,為什麼不能用觀察者設計模式當計算結果完成及時通知監聽者呢?

很多語言,比如Node.js,採用Callback的方式實現非同步編程。Java的一些框架,比如Netty,自己擴展了Java的 Future 介面,提供了 addListener 等多個擴展方法。Google的guava也提供了通用的擴展Future:ListenableFuture 、 SettableFuture 以及輔助類 Futures 等,方便非同步編程。為此,Java終於在JDK1.8這個版本中增加了一個能力更強的Future類:CompletableFuture 。它提供了非常強大的Future的擴展功能,可以幫助我們簡化非同步編程的複雜性,提供了函數式編程的能力,可以通過回調的方式處理計算結果。下麵來看看這幾種方式。

Netty-Future

引入Maven依賴:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.29.Final</version>
</dependency>
public class NettyFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        EventExecutorGroup group = new DefaultEventExecutorGroup(4);
        System.out.println("開始:" + DateUtils.getNow());

        Future<Integer> f = group.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                Thread.sleep(10000);
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                return 100;
            }
        });

        f.addListener(new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> objectFuture) throws Exception {
                System.out.println("計算結果:" + objectFuture.get());
            }
        });

        System.out.println("結束:" + DateUtils.getNow());
        // 不讓守護線程退出
        new CountDownLatch(1).await();
    }
}

輸出結果:

開始:2019-05-16 08:25:40:779
結束:2019-05-16 08:25:40:788
開始耗時計算:2019-05-16 08:25:40:788
結束耗時計算:2019-05-16 08:25:50:789
計算結果:100

從結果可以看出,耗時計算結束後自動觸發Listener的完成方法,避免了主線程無謂的阻塞等待,那麼它究竟是怎麼做到的呢?下麵看源碼

DefaultEventExecutorGroup 實現了 EventExecutorGroup 介面,而 EventExecutorGroup 則是實現了JDK ScheduledExecutorService 介面的線程組介面,所以它擁有線程池的所有方法。然而它卻把所有返回 java.util.concurrent.Future 的方法重寫為返回 io.netty.util.concurrent.Future ,把所有返回 java.util.concurrent.ScheduledFuture 的方法重寫為返回 io.netty.util.concurrent.ScheduledFuture 。

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    /**
     * 返回一個EventExecutor
     */
    EventExecutor next();

    Iterator<EventExecutor> iterator();

    Future<?> submit(Runnable task);
    <T> Future<T> submit(Runnable task, T result);
    <T> Future<T> submit(Callable<T> task);

    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

EventExecutorGroup 的submit方法因為 newTaskFor 的重寫導致返回了netty的 Future 實現類,而這個實現類正是 PromiseTask 。

@Override
public <T> Future<T> submit(Callable<T> task) {
    return (Future<T>) super.submit(task);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new PromiseTask<T>(this, callable);
}

PromiseTask 的實現很簡單,它緩存了要執行的 Callable 任務,併在run方法中完成了任務調用和Listener的通知。

@Override
public void run() {
    try {
        if (setUncancellableInternal()) {
            V result = task.call();
            setSuccessInternal(result);
        }
    } catch (Throwable e) {
        setFailureInternal(e);
    }
}

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

@Override
public Promise<V> setFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this, cause);
}

任務調用成功或者失敗都會調用 notifyListeners 來通知Listener,所以大家得在回調的函數里調用 isSuccess 方法來檢查狀態。

這裡有一個疑惑,會不會 Future 在調用 addListener 方法的時候任務已經執行完成了,這樣子會不會通知就會失敗了啊?

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    synchronized (this) {
        addListener0(listener);
    }

    if (isDone()) {
        notifyListeners();
    }

    return this;
}

可以發現,在Listener添加成功之後,會立即檢查狀態,如果任務已經完成立刻進行回調,所以這裡不用擔心啦。OK,下麵看看Guava-Future的實現。

Guava-Future

首先引入guava的Maven依賴:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>22.0</version>
</dependency>
public class GuavaFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
        ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                Thread.sleep(10000);
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                return 100;
            }
        });
        
        future.addListener(new Runnable() {
            @Override
            public void run() {
                System.out.println("調用成功");
            }
        }, executorService);
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }
}

ListenableFuture 可以通過 addListener 方法增加回調函數,一般用於不在乎執行結果的地方。如果需要在執行成功時獲取結果或者執行失敗時獲取異常信息,需要用到 Futures 工具類的 addCallback 方法:

Futures.addCallback(future, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(@Nullable Integer result) {
        System.out.println("成功,計算結果:" + result);
    }

    @Override
    public void onFailure(Throwable t) {
        System.out.println("失敗");
    }
}, executorService);

前面提到除了 ListenableFuture 外,還有一個 SettableFuture 類也支持回調能力。它實現自 ListenableFuture ,所以擁有 ListenableFuture 的所有能力。

public class GuavaFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListenableFuture<Integer> future = submit(executorService);
        Futures.addCallback(future, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(@Nullable Integer result) {
                System.out.println("成功,計算結果:" + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("失敗:" + t.getMessage());
            }
        }, executorService);
        Thread.sleep(1000);
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }

    private static ListenableFuture<Integer> submit(Executor executor) {
        SettableFuture<Integer> future = SettableFuture.create();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                // 返回值
                future.set(100);
                // 設置異常信息
//                future.setException(new RuntimeException("custom error!"));
            }
        });
        return future;
    }
}

看起來用法上沒有太多差別,但是有一個很容易被忽略的重要問題。當 SettableFuture 的這種方式最後調用了 cancel 方法後,線程池中的任務還是會繼續執行,而通過 submit 方法返回的 ListenableFuture 方法則會立即取消執行,這點尤其要註意。下麵看看源碼:

和Netty的Future一樣,Guava也是通過實現了自定義的 ExecutorService 實現類 ListeningExecutorService 來重寫了 submit 方法。

public interface ListeningExecutorService extends ExecutorService {
  <T> ListenableFuture<T> submit(Callable<T> task);
  ListenableFuture<?> submit(Runnable task);
  <T> ListenableFuture<T> submit(Runnable task, T result);
}

同樣的,newTaskFor 方法也被進行了重寫,返回了自定義的Future類:TrustedListenableFutureTask

@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return TrustedListenableFutureTask.create(runnable, value);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return TrustedListenableFutureTask.create(callable);
}

任務調用會走 TrustedFutureInterruptibleTask 的run方法:

@Override
public void run() {
    TrustedFutureInterruptibleTask localTask = task;
    if (localTask != null) {
        localTask.run();
    }
}

@Override
public final void run() {
    if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) {
        return; // someone else has run or is running.
    }
    try {
        // 抽象方法,子類進行重寫
        runInterruptibly();
    } finally {
        if (wasInterrupted()) {
            while (!doneInterrupting) {
                Thread.yield();
            }
        }
    }
}

最終還是調用到 TrustedFutureInterruptibleTask 的 runInterruptibly 方法,等待任務完成後調用 set 方法。

@Override
void runInterruptibly() {
    if (!isDone()) {
        try {
            set(callable.call());
        } catch (Throwable t) {
            setException(t);
        }
    }
}

protected boolean set(@Nullable V value) {
    Object valueToSet = value == null ? NULL : value;
    // CAS設置值
    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
        complete(this);
        return true;
    }
    return false;
}

在 complete 方法的最後會獲取到Listener進行回調。

上面提到的 SettableFuture 和 ListenableFuture 的 cancel 方法效果不同,原因在於一個重寫了 afterDone 方法而一個沒有。

下麵是 ListenableFuture 的 afterDone 方法:

@Override
protected void afterDone() {
    super.afterDone();

    if (wasInterrupted()) {
        TrustedFutureInterruptibleTask localTask = task;
        if (localTask != null) {
            localTask.interruptTask();
        }
    }

    this.task = null;
}

wasInterrupted 用來判斷是否調用了 cancel (cancel方法會設置一個取消對象Cancellation到value中)

protected final boolean wasInterrupted() {
    final Object localValue = value;
    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
}

interruptTask 方法通過線程的 interrupt 方法真正取消線程任務的執行:

final void interruptTask() {
    Thread currentRunner = runner;
    if (currentRunner != null) {
        currentRunner.interrupt();
    }
    doneInterrupting = true;
}

 

由 Callback Hell 引出 Promise 模式

如果你對 ES6 有所接觸,就不會對 Promise 這個模式感到陌生,如果你對前端不熟悉,也不要緊,我們先來看看回調地獄(Callback Hell)是個什麼概念。

回調是一種我們推崇的非同步調用方式,但也會遇到問題,也就是回調的嵌套。當需要多個非同步回調一起書寫時,就會出現下麵的代碼(以 js 為例):

asyncFunc1(opt, (...args1) => { 
  asyncFunc2(opt, (...args2) => {       
    asyncFunc3(opt, (...args3) => {            
      asyncFunc4(opt, (...args4) => {
          // some operation
      });
    });
  });
});

雖然在 JAVA 業務代碼中很少出現回調的多層嵌套,但總歸是個問題,這樣的代碼不易讀,嵌套太深修改也麻煩。於是 ES6 提出了 Promise 模式來解決回調地獄的問題。可能就會有人想問:java 中存在 Promise 模式嗎?答案是肯定的。

前面提到了 Netty 和 Guava 的擴展都提供了 addListener 這樣的介面,用於處理 Callback 調用,但其實 jdk1.8 已經提供了一種更為高級的回調方式:CompletableFuture。首先嘗試用 CompletableFuture 來重寫上面回調的問題。

public class CompletableFutureTest {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("開始耗時計算:" + DateUtils.getNow());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結束耗時計算:" + DateUtils.getNow());
            return 100;
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println("回調結果:" + result);
        });
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }
}

使用CompletableFuture耗時操作沒有占用主線程的時間片,達到了非同步調用的效果。我們也不需要引入任何第三方的依賴,這都是依賴於 java.util.concurrent.CompletableFuture 的出現。CompletableFuture 提供了近 50 多個方法,大大便捷了 java 多線程操作,和非同步調用的寫法。

使用 CompletableFuture 解決回調地獄問題:

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        long l = System.currentTimeMillis();
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("在回調中執行耗時操作...");
            Thread.sleep(10000);
            return 100;
        });
        completableFuture = completableFuture.thenCompose(i -> {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("在回調的回調中執行耗時操作...");
                Thread.sleep(10000);
                return i + 100;
            });
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println("計算結果:" + result);
        });
        System.out.println("主線程運算耗時:" + (System.currentTimeMillis() - l) + " ms");
        new CountDownLatch(1).await();
    }
}

輸出:

在回調中執行耗時操作...主線程運算耗時:58 ms在回調的回調中執行耗時操作...計算結果:200

使用 thenCompose 或者 thenComposeAsync 等方法可以實現回調的回調,且寫出來的方法易於維護。

 

總的看來,為Future模式增加回調功能就不需要阻塞等待結果的返回並且不需要消耗無謂的CPU資源去輪詢處理狀態,JDK8之前使用Netty或者Guava提供的工具類,JDK8之後則可以使用自帶的 CompletableFuture 類。Future 有兩種模式:將來式和回調式。而回調式會出現回調地獄的問題,由此衍生出了 Promise 模式來解決這個問題。這才是 Future 模式和 Promise 模式的相關性。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 工廠方法模式 定義 工廠方法(Factory Method)模式的意義是定義一個創建產品對象的工廠介面,將實際創建工作推遲到子類當中。核心工廠類不再負責產品的創建,這樣核心類成為一個抽象工廠角色,僅負責具體工廠子類必須實現的介面,這樣進一步抽象化的好處是使得工廠方法模式可以使系統在不修改具體工廠角色 ...
  • saltstack遠程執行 安裝完Saltstack後可以立即執行shell命令,更新軟體包並將文件同時分不到所有受管系統。所有回覆都以一致的可配置格式返回。遠程執行參考文檔:http://docs.saltstack.cn/topics/tutorials/modules.html Salt命令的 ...
  • 最近做一個項目是關於採集指紋的系統,先給大家簡單介紹一下項目的主要功能: 該項目主要是做一個採集嬰幼兒的手掌指紋和掌紋的客戶端,並且通過服務端介面保存手掌指紋到阿裡雲oss存儲中。 同時後臺提供管理功能,對採集人員,系統角色許可權管理,同時提供嬰幼兒的手指指紋圖片的查看和分析功能。 系統分為三個子系統 ...
  • 什麼是微服務 簡而言之 : 微服務架構風格這種開發方法,是以開發一組小型服務的方式來開發一個獨立的應用系統的.其中每個小型服務都運行在自己的進行中,並經常採用HTTP資源API 這樣輕量的機制來相互通信.這些服務圍繞業務功能進行構建,並能通過全自動的部署機制來進行獨立部署.這些微服務可以使用不同的語 ...
  • 上一次留給大家去做的實踐,不知道大家執行的怎麼樣了呢。 我們通過一個簡單的練習,完成了一個控制開關。那現在,我們打算將遙控器的每個插槽,對應到一個命令這樣就要遙控器變成“調用者”。當按下按鈕,相應命令對象的execute()方法就會被調用,其結果就是,接收者(例如電燈、風扇、音響)的動作被調用。 實 ...
  • 依賴倒置原則: 一般來說我們認為作為底層基礎框架的邏輯是不應該依賴於上層邏輯的, 所以我們設計軟體時也經常是: 需求 - 上層邏輯(直接實現需求) - 發現需要固化的邏輯 - 開發底層模塊 - 然後上層調用底層邏輯. 但是這樣做一開始是沒問題的, 但是當上層劇烈變化時, 會不斷的侵染底層邏輯, 底層 ...
  • 開篇先嘚啵 昨天寫到哪了? 睡醒就忘了... ... 不過聰明伶俐的博主,僅用1秒鐘就想起來了 我們昨天學了一個pandas的類型series 並且會創建了,厲不厲害 對於一個新的數據結構來說 額,不對,對於python任意的數據結構來說 或者換句話,對於任何對象來說 看我,就沒有對象... ... ...
  • 一、問題描述 給定兩個字元串,求解這兩個字元串的最長公共子序列(Longest Common Sequence)。比如字元串1:BDCABA;字元串2:ABCBDAB。則這兩個字元串的最長公共子序列長度為4,最長公共子序列是:BCBA 二、演算法求解 這是一個動態規劃的題目。對於可用動態規劃求解的問題 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...