死磕 java線程系列之線程池深入解析——未來任務執行流程

来源:https://www.cnblogs.com/tong-yuan/archive/2019/11/04/11795216.html
-Advertisement-
Play Games

(手機橫屏看源碼更方便) 註:java源碼分析部分如無特殊說明均基於 java8 版本。 註:線程池源碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 前面我們一起學習了線程池中普通任務的執行流程,但其實線程池中還有一種任務,叫作未來任務(future task),使用它您可以獲 ...


threadpool_futuretask

(手機橫屏看源碼更方便)


註:java源碼分析部分如無特殊說明均基於 java8 版本。

註:線程池源碼部分如無特殊說明均指ThreadPoolExecutor類。

簡介

前面我們一起學習了線程池中普通任務的執行流程,但其實線程池中還有一種任務,叫作未來任務(future task),使用它您可以獲取任務執行的結果,它是怎麼實現的呢?

建議學習本章前先去看看彤哥之前寫的《死磕 java線程系列之自己動手寫一個線程池(續)》,有助於理解本章的內容,且那邊的代碼比較短小,學起來相對容易一些。

問題

(1)線程池中的未來任務是怎麼執行的?

(2)我們能學到哪些比較好的設計模式?

(3)對我們未來學習別的框架有什麼幫助?

來個慄子

我們還是從一個例子入手,來講解來章的內容。

我們定義一個線程池,並使用它提交5個任務,這5個任務分別返回0、1、2、3、4,在未來的某一時刻,我們再取用它們的返回值,做一個累加操作。

public class ThreadPoolTest02 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 新建一個固定5個線程的線程池
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        List<Future<Integer>> futureList = new ArrayList<>();
        // 提交5個任務,分別返回0、1、2、3、4
        for (int i = 0; i < 5; i++) {
            int num = i;

            // 任務執行的結果用Future包裝
            Future<Integer> future = threadPool.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("return: " + num);
                // 返回值
                return num;
            });

            // 把future添加到list中
            futureList.add(future);
        }

        // 任務全部提交完再從future中get返回值,並做累加
        int sum = 0;
        for (Future<Integer> future : futureList) {
            sum += future.get();
        }

        System.out.println("sum=" + sum);
    }
}

這裡我們思考兩個問題:

(1)如果這裡使用普通任務,要怎麼寫,時間大概是多少?

如果使用普通任務,那麼就要把累加操作放到任務裡面,而且並不是那麼好寫(final的問題),總時間大概是1秒多一點。但是,這樣有一個缺點,就是累加操作跟任務本身的內容耦合到一起了,後面如果改成累乘,還要修改任務的內容。

(2)如果這裡把future.get()放到for迴圈裡面,時間大概是多少?

這個問題我們先不回答,先來看源碼分析。

submit()方法

submit方法,它是提交有返回值任務的一種方式,內部使用未來任務(FutureTask)包裝,再交給execute()去執行,最後返回未來任務本身。

public <T> Future<T> submit(Callable<T> task) {
    // 非空檢測
    if (task == null) throw new NullPointerException();
    // 包裝成FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    // 交給execute()方法去執行
    execute(ftask);
    // 返回futureTask
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    // 將普通任務包裝成FutureTask
    return new FutureTask<T>(callable);
}

這裡的設計很巧妙,實際上這兩個方法都是在AbstractExecutorService這個抽象類中完成的,這是模板方法的一種運用。

我們來看看FutureTask的繼承體系:

threadpool_futuretask

FutureTask實現了RunnableFuture介面,而RunnableFuture介面組合了Runnable介面和Future介面的能力,而Future介面提供了get任務返回值的能力。

問題:submit()方法返回的為什麼是Future介面而不是RunnableFuture介面或者FutureTask類呢?

答:這是因為submit()返回的結果,對外部調用者只想暴露其get()的能力(Future介面),而不想暴露其run()的能力(Runaable介面)。

FutureTask類的run()方法

經過上一章的學習,我們知道execute()方法最後調用的是task的run()方法,上面我們傳進去的任務,最後被包裝成了FutureTask,也就是說execute()方法最後會調用到FutureTask的run()方法,所以我們直接看這個方法就可以了。

public void run() {
    // 狀態不為NEW,或者修改為當前線程來運行這個任務失敗,則直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    
    try {
        // 真正的任務
        Callable<V> c = callable;
        // state必須為NEW時才運行
        if (c != null && state == NEW) {
            // 運行的結果
            V result;
            boolean ran;
            try {
                // 任務執行的地方【本文由公從號“彤哥讀源碼”原創】
                result = c.call();
                // 已執行完畢
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 處理異常
                setException(ex);
            }
            if (ran)
                // 處理結果
                set(result);
        }
    } finally {
        // 置空runner
        runner = null;
        // 處理中斷
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

可以看到代碼也比較簡單,先做狀態的檢測,再執行任務,最後處理結果或異常。

執行任務這裡沒啥問題,讓我們看看處理結果或異常的代碼。

protected void setException(Throwable t) {
    // 將狀態從NEW置為COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 返回值置為傳進來的異常(outcome為調用get()方法時返回的)
        outcome = t;
        // 最終的狀態設置為EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        // 調用完成方法
        finishCompletion();
    }
}
protected void set(V v) {
    // 將狀態從NEW置為COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 返回值置為傳進來的結果(outcome為調用get()方法時返回的)
        outcome = v;
        // 最終的狀態設置為NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        // 調用完成方法
        finishCompletion();
    }
}

咋一看,這兩個方法似乎差不多,不同的是出去的結果不一樣且狀態不一樣,最後都調用了finishCompletion()方法。

private void finishCompletion() {
    // 如果隊列不為空(這個隊列實際上為調用者線程)
    for (WaitNode q; (q = waiters) != null;) {
        // 置空隊列
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 調用者線程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 如果調用者線程不為空,則喚醒它
                    // 【本文由公從號“彤哥讀源碼”原創】
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 鉤子方法,子類重寫
    done();
    // 置空任務
    callable = null;        // to reduce footprint
}

整個run()方法總結下來:

(1)FutureTask有一個狀態state控制任務的運行過程,正常運行結束state從NEW->COMPLETING->NORMAL,異常運行結束state從NEW->COMPLETING->EXCEPTIONAL;

(2)FutureTask保存了運行任務的線程runner,它是線程池中的某個線程;

(3)調用者線程是保存在waiters隊列中的,它是什麼時候設置進去的呢?

(4)任務執行完畢,除了設置狀態state變化之外,還要喚醒調用者線程。

調用者線程是什麼時候保存在FutureTask中(waiters)的呢?查看構造方法:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

發現並沒有相關信息,我們再試想一下,如果調用者不調用get()方法,那麼這種未來任務是不是跟普通任務沒有什麼區別?確實是的哈,所以只有調用get()方法了才有必要保存調用者線程到FutureTask中。

所以,我們來看看get()方法中是什麼鬼。

FutureTask類的get()方法

get()方法調用時如果任務未執行完畢,會阻塞直到任務結束。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果狀態小於等於COMPLETING,則進入隊列等待
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    // 返回結果(異常)
    return report(s);
}

是不是很清楚,如果任務狀態小於等於COMPLETING,則進入隊列等待。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 我們這裡假設不帶超時
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 處理中斷
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        // 4. 如果狀態大於COMPLETING了,則跳出迴圈並返回
        // 這是自旋的出口
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 如果狀態等於COMPLETING,說明任務快完成了,就差設置狀態到NORMAL或EXCEPTIONAL和設置結果了
        // 這時候就讓出CPU,優先完成任務
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 1. 如果隊列為空
        else if (q == null)
            // 初始化隊列(WaitNode中記錄了調用者線程)
            q = new WaitNode();
        // 2. 未進入隊列
        else if (!queued)
            // 嘗試入隊
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 超時處理
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        // 3. 阻塞當前線程(調用者線程)
        else
            // 【本文由公從號“彤哥讀源碼”原創】
            LockSupport.park(this);
    }
}

這裡我們假設調用get()時任務還未執行,也就是其狀態為NEW,我們試著按上面標示的1、2、3、4走一遍邏輯:

(1)第一次迴圈,狀態為NEW,直接到1處,初始化隊列並把調用者線程封裝在WaitNode中;

(2)第二次迴圈,狀態為NEW,隊列不為空,到2處,讓包含調用者線程的WaitNode入隊;

(3)第三次迴圈,狀態為NEW,隊列不為空,且已入隊,到3處,阻塞調用者線程;

(4)假設過了一會任務執行完畢了,根據run()方法的分析最後會unpark調用者線程,也就是3處會被喚醒;

(5)第四次迴圈,狀態肯定大於COMPLETING了,退出迴圈並返回;

問題:為什麼要在for迴圈中控制整個流程呢,把這裡的每一步單獨拿出來寫行不行?

答:因為每一次動作都需要重新檢查狀態state有沒有變化,如果拿出去寫也是可以的,只是代碼會非常冗長。這裡只分析了get()時狀態為NEW,其它的狀態也可以自行驗證,都是可以保證正確的,甚至兩個線程交叉運行(斷點的技巧)。

OK,這裡返回之後,再看看是怎麼處理最終的結果的。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 任務正常結束
    if (s == NORMAL)
        return (V)x;
    // 被取消了
    if (s >= CANCELLED)
        throw new CancellationException();
    // 執行異常
    throw new ExecutionException((Throwable)x);
}

還記得前面分析run的時候嗎,任務執行異常時是把異常放在outcome裡面的,這裡就用到了。

(1)如果正常執行結束,則返回任務的返回值;

(2)如果異常結束,則包裝成ExecutionException異常拋出;

通過這種方式,線程中出現的異常也可以返回給調用者線程了,不會像執行普通任務那樣調用者是不知道任務執行到底有沒有成功的。

其它

FutureTask除了可以獲取任務的返回值以外,還能夠取消任務的執行。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

這裡取消任務是通過中斷執行線程來處理的,有興趣的同學可以自己分析一下。

回答開篇

如果這裡把future.get()放到for迴圈裡面,時間大概是多少?

答:大概會是5秒多一點,因為每提交一個任務,都要阻塞調用者線程直到任務執行完畢,每個任務執行都是1秒多,所以總時間就是5秒多點。

總結

(1)未來任務是通過把普通任務包裝成FutureTask來實現的。

(2)通過FutureTask不僅能夠獲取任務執行的結果,還有感知到任務執行的異常,甚至還可以取消任務;

(3)AbstractExecutorService中定義了很多模板方法,這是一種很重要的設計模式;

(4)FutureTask其實就是典型的異常調用的實現方式,後面我們學習到Netty、Dubbo的時候還會見到這種設計思想的。

彩蛋

RPC框架中非同步調用是怎麼實現的?

答:RPC框架常用的調用方式有同步調用、非同步調用,其實它們本質上都是非同步調用,它們就是用FutureTask的方式來實現的。

一般地,通過一個線程(我們叫作遠程線程)去調用遠程介面,如果是同步調用,則直接讓調用者線程阻塞著等待遠程線程調用的結果,待結果返回了再返回;如果是非同步調用,則先返回一個未來可以獲取到遠程結果的東西FutureXxx,當然,如果這個FutureXxx在遠程結果返回之前調用了get()方法一樣會阻塞著調用者線程。

有興趣的同學可以先去預習一下dubbo的非同步調用(它是把Future扔到RpcContext中的)。


歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 多重背包問題 給定$n$種物品,第$i$種共有$c_i$個,價值為$v_i$,重量為$w_i$。現在有一個背包,最大載重量為$m$。求若選一些物品放到背包里,最多能放的總價值是多少。 解法$\bm1$ 考慮將多重背包轉化為01背包。最簡單的想法是將$1$種物品直接拆分成$c_i$個相同的物品,然後0 ...
  • 先安裝一下這個命令 add-apt-repositoryapt-get install software-properties-common 添加第三方源:add-apt-repository ppa:ondrej/phpapt-get update 安裝php:apt-get install ph ...
  • 一、進程與線程之間的關係 1、線程是屬於進程的,線程運行在進程空間內,同一進程所產生的線程共用同一記憶體空間,當進程退出時該進程所產生的線程都會被強制退出並清除。 2、線程可與屬於同一進程的其它線程共用進程所擁有的全部資源,但是其本身基本上不擁有系統資源,只擁有一點在運行中必不可少的信息(如程式計數器 ...
  • 本篇文章給大家帶來的內容是關於Laravel服務容器的綁定與解析,有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。 前言 老實說,第一次老大讓我看laravel框架手冊的那天早上,我是很絕望的,因為真的沒接觸過,對我這種渣渣來說,laravel的入門門檻確實有點高了,但還是得硬著頭皮看 ...
  • 開發環境: Windows操作系統 開發工具:MyEclipse/Eclipse + JDK+ Tomcat + MySQL 資料庫 項目簡介: 戶籍管理系統主體將圍繞戶籍信息,身份證服務管理等方面進行展開設計,系統分為前臺信息展示,後臺的數據處理兩大模塊。必須選擇非功能性需求與功能需求共同實施,提 ...
  • 本文源碼: "GitHub·點這裡" || "GitEE·點這裡" "01:項目技術選型簡介,架構圖解說明" "02:業務架構設計,系統分層管理" "03:資料庫選型,業務數據設計規劃" 04:中間件集成,公共服務管理 一、中間件簡介 中間件是基礎軟體的一類, 屬於復用性極高的軟體。處於操作系統軟體 ...
  • 重構是 一種對軟體進行修改的行為,但它並不改變軟體的功能特征,而是通過讓軟體程式更清晰,更簡潔和更條理來改進軟體的質量。代碼重構之於軟體,相當於結構修改之於散文。每次人們對如何對代碼進行重構的討論就像是討論如果對一篇文學作品進行修訂一樣無休無止。所有人都知道應該根據項目的自身情況來對代碼進行重構,而 ...
  • 1. springboot是對spring的缺點進行改善和優化,它的約定大於配置,開箱即用,沒有代碼生成,也不需要xml文件配置,可以修改屬性值來滿足需求 2. springboot的入門程式 在idea中創建springboot的項目 (1) 預設有個DemoApplication類,是sprin ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...