併發編程-FutureTask解析

来源:https://www.cnblogs.com/Jcloud/archive/2023/07/27/17584613.html
-Advertisement-
Play Games

通過本文可以瞭解FutureTask任務執行的方式以及Future.get已阻塞的方式獲取線程執行的結果原理,並且從代碼中可以瞭解FutureTask的任務執行狀態以及狀態的變化過程。 ...


1、FutureTask對象介紹

Future對象大家都不陌生,是JDK1.5提供的介面,是用來以阻塞的方式獲取線程非同步執行完的結果。

在Java中想要通過線程執行一個任務,離不開Runnable與Callable這兩個介面。

Runnable與Callable的區別在於,Runnable介面只有一個run方法,該方法用來執行邏輯,但是並沒有返回值;而Callable的call方法,同樣用來執行業務邏輯,但是是有一個返回值的。

Callable執行任務過程中可以通過FutureTask獲得任務的執行狀態,並且可以在執行完成後通過Future.get()方式獲取執行結果。

Future是一個介面,而FutureTask就是Future的實現類。並且FutureTask實現了 RunnableFuture(Runnable + Future),說明我們可以創建一個FutureTask並直接把它放到線程池執行,然後獲取FutureTask的執行結果。

2、FutureTask源碼解析

2.1 主要方法和屬性

那麼FutureTask是如何通過阻塞的方式來獲取到非同步線程執行的結果的呢?我們看下FutureTask中的屬性。

// FutureTask的狀態及其常量
private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    // callable對象,執行完後置空
    private Callable<V> callable;
    // 要返回的結果或要引發的異常來自 get() 方法
    private Object outcome; // non-volatile, protected by state reads/writes
    // 執行Callable的線程
    private volatile Thread runner;
    // 等待線程的一個鏈表結構
    private volatile WaitNode waiters;


FutureTask中幾個比較重要的方法。

// 取消任務的執行
boolean cancel(boolean mayInterruptIfRunning);
// 返回任務是否已經被取消
boolean isCancelled();
// 返回任務是否已經完成,任務狀態不為NEW即為完成
boolean isDone();
// 通過get方法獲取任務的執行結果
V get() throws InterruptedException, ExecutionException;
// 通過get方法獲取任務的執行結果,帶有超時,如果超過給定時間則拋出異常
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;


2.2 FutureTask執行

當我們線上程池中執行一個Callable方法時,其實是將Callable任務封裝成一個RunnableFuture對象去執行,同時將這個RunnableFuture對象返回,這樣我們就拿到了FutureTask的引用,可以隨時獲取到任務執行的狀態,並且可以在任務執行完成後通過該對象獲取執行結果。

以下為ThreadPoolExecutor線程池提交一個callable方法的源碼。

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
	
	protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }


2.3 run方法介紹

RunnableFuture其實也是一個可以執行的runnable,我們看下他的run方法。其主要流程就是執行call方法,正常執行完畢後將result結果賦值到outcome屬性上。

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            // 將callable賦值到本地變數
            Callable<V> c = callable;
            // 判斷callable不為空並且FutureTask的狀態必須為新創建
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 執行call方法(用戶自己實現的call邏輯),並獲取到result結果
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 如果執行過程出現異常,則將異常對象賦值到outcome上
                    setException(ex);
                }
                // 如果正常執行完畢,則將result賦值到outcome屬性上
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }


以下邏輯為正常執行完成後賦值的邏輯。

// 如果任務沒有被取消,將future執行完的返回值賦值給result結果
// FutureTask任務的執行狀態是通過CAS的方式進行賦值的,並且由此可知,COMPLETING其實是一個瞬時狀態
// 當將線程執行結果賦值給outcome後,狀態會修改為對應的NORMAL,即正常結束
protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }


以下為執行異常時賦值邏輯,直接將Throwable對象賦值到outcome屬性上。

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }


無論是正常執行還是異常執行,最終都會調用一個finishCompletion方法,用來做工作的收尾工作。

2.4 get方法介紹

Future的get方法有兩個重載的方法,一個是get()獲取結果,一個是get(long, TimeUnit)帶有超時時間的獲取結果,我們看下FutureTask中的這兩個方法是如何實現的。

// 不帶有超時時間,一直阻塞直到獲取結果
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            // 等待結果完成,帶有超時的get方法也是調用的awaitDone方法
            s = awaitDone(false, 0L);
        // 返回結果
        return report(s);
    }

// 帶有超時時間的獲取結果,如果超過時間還沒有獲取到結果則拋出異常
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        // 如果任務未中斷,調用awaitDone方法等待任務結果
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        // 返回結果
        return report(s);
    }


我們主要看下awaitDone方法的執行邏輯。此方法會通過for迴圈的方式一直阻塞等待任務執行完成。如果帶有超時時間,則超過截止時間後會直接返回。

// timed:是否需要超時獲取
// nanos:超時時間單位納秒
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        // 此方法會一直for迴圈判斷任務狀態是否已經完成,是Future.get阻塞的原因
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 任務狀態大於COMPLETING,則表明任務結束,直接返回
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                // Thread.yield() 方法,使當前線程由執行狀態,變成為就緒狀態,讓出cpu時間,在下一個線程執行時候,此線程有可能被執行,也有可能沒有被執行。
                // COMPLETING狀態為瞬時狀態,任務執行完成,要麼是正常結束,要麼異常結束,後續會被置為NORMAL或者EXCEPTIONAL
                Thread.yield();
            else if (q == null)
                // 每調用一次get方法,都會創建一個WaitNode等待節點
                q = new WaitNode();
            else if (!queued)
                // 將該等待節點添加到鏈表結構waiters中,q.next = waiters 即在waiters的頭部插入
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果方法帶有超時判斷,則判斷當前時間是否已經超過了截止時間,如果超過了及截止日期,則退出迴圈直接返回當前狀態,此時任務狀態一定是NEW
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }


我們在看下report方法,在調用get方法時是如何返回結果的。

這裡首先獲取outcome的值,並判斷任務是否已經執行完成,如果執行完成,則將outcome對象強轉成泛型指定的類型;如果任務被取消了,則拋出一個CancellationException異常;如果都不是,則說明任務在執行過程中發生了異常,此時任務狀態位EXCEPTIONAL,此時的outcome即為Throwable對象,所以將outcome強轉為Throwable並拋出異常。

由此可以知道,我們將一個FutureTask任務submit到線程池中執行的時候,如果發生了異常,是會在調用get方法的時候拋出的。

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


2.5 cancel方法介紹

cancel方法用於取消正在運行的任務,如果任務取消成功,則返回TRUE,如果取消失敗則返回FALSE。

// mayInterruptIfRunning:允許中斷正在運行的任務
public boolean cancel(boolean mayInterruptIfRunning) {
        // mayInterruptIfRunning如果為true則將狀態置為INTERRUPTING,如果未false則將狀態置為CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        // 如果狀態修改成功後,判斷是否允許中斷線程,如果允許,則調用Thread的interrupt方法中斷
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 取消後的收尾工作
            finishCompletion();
        }
        return true;
    }


2.6 isDone/isCancelled方法介紹

isDone方法用於判斷FutureTask是否已經完成;isCancelled方法用來判斷FutureTask是否已經取消,這兩個方法都是通過狀態位來判斷的。

public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }


2.7 finishCompletion方法介紹

我們看下finishCompletion方法都做了哪些工作。

// 刪除所有等待線程併發出信號,最後執行done方法
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }


我們看到done方法是一個受保護的空方法,此處沒有任何邏輯,由其子類去根據自己的業務去實現相應的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。

protected void done() { }

3、總結

通過源碼解讀可以瞭解到Future的原理:

第一步:主線程將任務封裝成一個Callable對象,通過submit方法提交到線程池去執行。

第二步:線程池執行任務的run方法,主線程則可以繼續執行其他邏輯。

第三步:線程池中方法執行完成後將結果賦值到outcome屬性上,並修改任務狀態。

第四步:主線程在需要拿到非同步任務結果的時候,主動調用fugure.get()方法來獲取結果。

第五步:如果非同步線程在執行過程中發生異常,則會在調用future.get()方法的時候拋出來。

以上就是對於FutureTask的分析,我們可以瞭解FutureTask任務執行的方式以及Future.get已阻塞的方式獲取線程執行的結果原理,並且從代碼中可以瞭解FutureTask的任務執行狀態以及狀態的變化過程。

作者:京東物流 丁冬

來源:京東雲開發者社區 自猿其說Tech


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Mac/Win 最新 IntelliJ IDEA 2023.2 激活破解教程,附激活碼(持續更新~),適用於 JetBrains 全家桶的所有工具。 ...
  • 部署容器是使用Docker和容器化管理應用程式更高效、易於擴展和確保跨環境一致性性能的關鍵步驟。本主題將為您概述如何部署Docker容器以創建和運行應用程式。 ## 概述 Docker容器是輕量級、可移植且自我包含的環境,可以運行應用程式及其依賴項。部署容器涉及啟動、管理和擴展這些隔離的環境,以便順 ...
  • ## 開篇介紹 Java 8 中新增的特性旨在幫助程式員寫出更好的代碼,其中對核心類庫的改進是很關鍵的一部分,也是本章的主要內容。對核心類庫的改進主要包括集合類的 API 和新引入的流(Stream),流使程式員得以站在更高的抽象層次上對集合進行操作。下麵將介紹stream流的用法。 ## 1.初始 ...
  • # 同步電路與非同步電路 - ## 同步電路 - 電路中所有觸發器均連接同一個時鐘脈衝源,觸發器的狀態變化均與時鐘脈衝信號同步; - 電路中所有時鐘同源同相; - 同相位時鐘:始終頻率不同,但是時鐘邊沿對齊; - ![](https://img2023.cnblogs.com/blog/1964011 ...
  • 上節討論瞭如何保障數據中台的數據質量,讓數據“準”。除了“快”和“準”,數據中台還離不開“省”。隨數據規模越來越大,成本越來越高,如不合理控製成本,還沒等你挖掘出數據應用價值,企業利潤就被消耗完。 能否做到精細化成本管理,關乎數據中台項目成敗。 某電商業務數據建設資源增長趨勢(CU= 1vcpu + ...
  • # 事務 - **基本介紹** 1. JDBC 程式中當一個Connection對象創建時,預設情況下是自動提交事務:每次執行一個 SQL 語句時,如果執行成功,就會向資料庫自動提交,而不能回滾。 2. JDBC程式中為了多個SQL語句作為一個整體執行,需要==使用事務==。 3. 調用 Conne ...
  • ## 教程簡介 Excel Power View 是一種數據可視化技術,用於創建互動式圖表、圖形、地圖和其他視覺效果,以便直觀呈現數據。 Excel Power View中,可以快速創建各種可視化效果,從表格和矩陣到餅圖、條形圖和氣泡圖,以及多個圖表的集合。要創建各種可視化效果,請首先從表格開始著手 ...
  • 經常開發表格,是不是已經被手寫Ant-Design Table的Columns整煩了?尤其是ToB項目,表格經常動不動就幾十列。每次照著後端給的介面文檔一個個配置,太頭疼了,主要是有時還會粘錯就尷尬了。那有沒有辦法能自動生成columns配置呢? ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...