死磕 java線程系列之自己動手寫一個線程池(續)

来源:https://www.cnblogs.com/tong-yuan/archive/2019/10/10/11651370.html
-Advertisement-
Play Games

(手機橫屏看源碼更方便) 問題 (1)自己動手寫的線程池如何支持帶返回值的任務呢? (2)如果任務執行的過程中拋出異常了該怎麼處理呢? 簡介 上一章我們自己動手寫了一個線程池,但是它是不支持帶返回值的任務的,那麼,我們自己能否實現呢?必須可以,今天我們就一起來實現帶返回值任務的線程池。 前情回顧 首 ...


mythreadpool

(手機橫屏看源碼更方便)


問題

(1)自己動手寫的線程池如何支持帶返回值的任務呢?

(2)如果任務執行的過程中拋出異常了該怎麼處理呢?

簡介

上一章我們自己動手寫了一個線程池,但是它是不支持帶返回值的任務的,那麼,我們自己能否實現呢?必須可以,今天我們就一起來實現帶返回值任務的線程池。

前情回顧

首先,讓我們先回顧一下上一章寫的線程池:

(1)它包含四個要素:核心線程數、最大線程數、任務隊列、拒絕策略;

(2)它具有執行無返回值任務的能力;

(3)它無法處理有返回值的任務;

(4)它無法處理任務執行的異常(線程中的異常不會拋出到線程外);

那麼,我們能不能在現有的基礎上實現其下麵兩項能力呢?讓我們一起來試一試吧!

有返回值和無返回值的任務到底有何不同?

答案很明顯,就是一個有返回值,一個無返回值,用偽代碼來表示就是下麵這樣:

    // 無返回值
    threadPool.execute(()->{
        System.out.println(1);
    });
    // 有返回值,分兩步走
    // 1. 提交任務到線程池中
    SomeClass result = threadPool.execute(()->{
        System.out.println(1);
        return 1;
    });
    // 2. 等待任務的結果返回
    Object value = result.get();
    

無返回值的任務提交了就完事,主線程並不Care它到底有沒有執行完,並不關心它是不是拋出異常,主線程Just提交線程到線程池中,其餘什麼都不管。

有返回值的任務就不一樣了,主線程首先要提交任務到線程池中,它需要使用到任務執行的結果,所以它必須等待任務執行完畢才能拿到任務執行的結果。

那麼,為什麼不直接在execute的時候就等待任務執行完畢呢?這樣的話那不就跟串列沒啥區別了,還不如直接在主線程執行任務呢,還少了線程切換的資源消耗。

之所以要分成兩步,是因為主線程並不一定需要立即獲取返回值,在需要用到返回值的時候才去get,這樣就可以在提交任務和獲取返回值之間幹些其它的事情,提高效率。

所以,提交任務的時候不需要阻塞,get返回值的時候才可能需要阻塞,如果get的時候任務已經執行完畢了,這時候也不需要阻塞,如果get的時候任務還未執行完畢,那就要阻塞等待任務執行完畢才能獲取到返回值。

實現分析

首先,無返回值的任務我們直接使用的Runnable函數式介面,有返回值的任務有沒有現成的介面呢?還真有,那就是Callable介面,它有個返回值。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

其次,提交任務的時候需要有個返回值,它是在將來用來獲取任務執行結果的,實際上它也是新任務的一種能力,可以使用它對任務進行包裝,使其具有返回值的能力。

public interface Future<T> {
    T get();
}

再次,我們需要給現有的線程池增加一種新的能力,根據單一職責原則,我們定義一個新的介面來承載這種能力。

public interface FutureExecutor extends Executor {
    <T> Future<T> submit(Callable<T> command);
}

然後,我們需要一種新的任務,它既具有舊任務的執行能力(run()方法),又具有新任務的返回值能力(get()方法),所以我們造一個“將來的任務”對提交的任務進行包裝,使其具有返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {

    /**
     * 真正的任務
     */
    private Callable<T> task;
    
    public FutureTask(Callable<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        // 具體實現...
    }

    @Override
    public T get() {
        // 具體實現...
    }
}

最後,我們只要對原有的線程池進行擴展,將提交的任務包裝成“將來獲取返回值的任務”,還是使用原來的方法去執行,然後返回這個將來的任務即可。

根據開閉原則,【本篇文章由公眾號“彤哥讀源碼”原創】原來的代碼我們不做任何修改,擴展新的子類來實現新的能力。

public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {

    public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        super(name, coreSize, maxSize, taskQueue, rejectPolicy);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        // 包裝成將來獲取返回值的任務
        FutureTask<T> futureTask = new FutureTask<>(task);
        // 還是使用原來的執行能力
        execute(futureTask);
        // 返回將來的任務,只需要返回其get返回值的能力即可
        // 所以這裡返回的是Future而不是FutureTask類型
        return futureTask;
    }
}

好了,到這裡整體的邏輯我們就已經比較清晰地實現完了,還剩下最關鍵的部分,這個“將來的任務”的兩個能力要如何實現。

將來的任務

將來的任務,具有兩個能力:一是執行真正任務的能力,二是將來獲取返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {
    @Override
    public void run() {
        // 具體實現...
    }

    @Override
    public T get() {
        // 具體實現...
    }
}

首先,我們要明確一件事,任務的執行是線程池中,獲取返回值是在主線程中,它們是在兩個線程中執行的,而且誰先誰後我們無法確定。

其次,如果run()在get()之前執行,我們需要告訴get()任務已經執行完畢了,所以需要一個狀態來通知這個事,還需要一個變數來承載任務執行的返回值。

    /**
     * 任務執行的狀態,0未開始,1正常完成,2異常完成
     * 也可以使用volatile+Unsafe實現CAS操作
     */
    private AtomicInteger state = new AtomicInteger(NEW);
    private static final int NEW = 0;
    private static final int FINISHED = 1;
    private static final int EXCEPTION = 2;
    /**
     * 任務執行的結果【本篇文章由公眾號“彤哥讀源碼”原創】
     * 如果執行正常,返回結果為T
     * 如果執行異常,返回結果為Exception
     */
    private Object result;

再次,如果get()在run()之前執行,那就需要阻塞等待run()執行完畢才能拿到返回值,所以需要保存調用者(主線程),get()的時候park阻塞住,run()完成了unpark喚醒它來拿返回值。

    /**
     * 調用者線程
     * 也可以使用volatile+Unsafe實現CAS操作
     */
    private AtomicReference<Thread> caller = new AtomicReference<>();

然後,我們先來看看run()方法的邏輯,它其實就是先執行真正的任務,然後修改狀態為完成,並保存任務的返回值,如果保存了主線程,還要喚醒它。

    @Override
    public void run() {
        // 如果狀態不是NEW,說明執行過了,直接返回
        if (state.get() != NEW) {
            return;
        }
        try {
            // 執行任務【本篇文章由公眾號“彤哥讀源碼”原創】
            T r = task.call();
            // CAS更新state的值為FINISHED
            // 如果更新成功,就把r賦值給result
            // 如果更新失敗,說明state的值不為NEW了,也就是任務已經執行過了
            if (state.compareAndSet(NEW, FINISHED)) {
                this.result = r;
                // finish()必須放在修改state裡面,見下麵的分析
                finish();
            }
        } catch (Exception e) {
            // 如果CAS更新state的值為EXCEPTION成功,就把e賦值給result
            // 如果CAS更新失敗,說明state的值不為NEW了,也就是任務已經執行過了
            if (state.compareAndSet(NEW, EXCEPTION)) {
                this.result = e;
                // finish()必須放在修改state裡面,見下麵的分析
                finish();
            }
        }
    }

    private void finish() {
        // 檢查調用者是否為空,如果不為空,喚醒它
        // 調用者在調用get()方法的進入阻塞狀態
        for (Thread c; (c = caller.get()) != null;) {
            if (caller.compareAndSet(c, null)) {
                LockSupport.unpark(c);
            }
        }
    }

最後,我們再看看get()方法,如果任務還未執行,就阻塞等待任務的執行;如果任務已經執行完畢了,直接拿返回值即可;但是,還有一種情況,get()方法執行的過程中run()方法也在執行,所以get()方法中的每一步都要檢查狀態的值有沒有變化。

@Override
    public T get() {
        int s = state.get();
        // 如果任務還未執行完成,判斷當前線程是否要進入阻塞狀態
        if (s == NEW) {
            // 標識調用者線程是否被標記過
            boolean marked = false;
            for (;;) {
                // 重新獲取state的值
                s = state.get();
                // 如果state大於NEW說明完成了,跳出迴圈
                if (s > NEW) {
                    break;
                    // 此處必須把caller的CAS更新和park()方法分成兩步處理,不能把park()放在CAS裡面
                } else if (!marked) {
                    // 嘗試更新調用者線程
                    // 試想斷點停在此處【本篇文章由公眾號“彤哥讀源碼”原創】
                    // 此時state為NEW,讓run()方法執行到底,它不會執行finish()中的unpark()方法
                    // 這時打開斷點,這裡會更新caller成功,但是迴圈從頭再執行一遍發現state已經變了,
                    // 直接在上面的if(s>NEW)處跳出迴圈了,因為finish()在修改state內部
                    marked = caller.compareAndSet(null, Thread.currentThread());
                } else {
                    // 調用者線程更新之後park當前線程
                    // 試想斷點停在此處
                    // 此時state為NEW,讓run()方法執行到底,因為上面的caller已經設置值了,
                    // 所以會執行finish()方法中的unpark()方法,
                    // 這時再打開斷點,這裡不會park信
                    // 見unpark()方法的註釋,上面寫得很清楚:
                    // 如果線程執行了park()方法,那麼執行unpark()方法會喚醒那個線程
                    // 如果先執行了unpark()方法,那麼線程下一次執行park()方法將不會阻塞
                    LockSupport.park();
                }
            }
        }

        if (s == FINISHED) {
            return (T) result;
        }
        throw new RuntimeException((Throwable) result);
    }

在我們的實現中,如果任務執行的過程拋出異常了,也是通過result返回給主線程,這樣主線程就拿到了這個異常,它就可以做相應的處理了。

好了,完整的實現到此結束,不知道你領悟了沒有。

測試用例

最後奉上測試代碼:

public class MyThreadPoolFutureExecutorTest {
    public static void main(String[] args) {
        FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
        List<Future<Integer>> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int num = i;
            Future<Integer> future = threadPool.submit(() -> {
                Thread.sleep(1000);
                System.out.println("running: " + num);
                return num;
            });
            list.add(future);
        }

        for (Future<Integer> future : list) {
            System.out.println("runned: " + future.get());
        }
    }
}

運行結果:

thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒絕的任務
【本篇文章由公眾號“彤哥讀源碼”原創】
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
running: 3
running: 5
runned: 2
runned: 3
runned: 4
runned: 5
running: 6
running: 7
runned: 6
runned: 7
runned: 8
runned: 9

總結

(1)有返回值的任務是通過包裝成將來的任務來實現的,這個任務既具有基本的執行能力,又具有將來獲取返回值的能力;

(2)任務執行的異常跟任務正常的返回值是通過同一個返回值返回到主線程的,主線程根據狀態判斷是異常還是正常值;

(3)我們的實現中運用了單一職責原則、開閉原則等設計原則,對原有代碼沒有造成任何的入侵;

彩蛋

手寫線程池目前只打算寫這兩章,後面開始進入jdk原生線程池的源碼分析,敬請期待。

另外,需要手寫線程池完整源碼的同學請關註我的公眾號“彤哥讀源碼”,在後臺回覆“MyThreadPool”(不帶引號)即可領取手寫線程池完整源碼,註意大小寫不要弄錯哦,否則彤哥是不會給你的哈。


歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Java中使用SimpleDateFormat 進行日期格式化類 SimpleDateFormat 日期格式化類 示例 1 : 日期轉字元串 y 代表年 M 代表月 d 代表日 H 代表24進位的小時 h 代表12進位的小時 m 代表分鐘 s 代表秒 S 代表毫秒 package date; imp ...
  • 前言 在SpringCache緩存初探中我們研究瞭如何利用spring cache已有的幾種實現快速地滿足我們對於緩存的需求。這一次我們有了新的更個性化的需求,想在一個請求的生命周期里實現緩存。 需求背景是:一次數據的組裝需要調用多個方法,然而在這多個方法里又會調用同一個IO介面,此時多浪費了一次I ...
  • 一、mybatis歷史: 額,學習一門新事物時瞭解一下它的歷史能在腦中形成一個大致的輪廓...進入正題... mybatis以前叫做ibatis,ibatis是Apache旗下的產品,在2010年時google將其接管並更名為mybatis。(你可以發現它的包結構還是org.apache.ibati ...
  • 一、封裝的步驟 (1)所有屬性私有化,使用private關鍵字進行修飾,private表示私有的,修飾的所有數據只能在本類中進行訪問。 (2)對外提供簡單的操作入口,也就是說以後外部程式要想訪問age屬性的話,必須通過這些簡單的入口才能進行訪問。 i.對外提供兩個公開的方法,分別是set方法和get ...
  • python day 7 2019/10/10 學習資料來自老男孩教育 [TOC] 1. time模塊 若要使用python的time模塊,需要先導入time. 電腦有兩種標準時間,一種是基於世界時間的時間戳(即給定日期距1970年1月1日的秒數),另外一種是struct_time對象的9個數字的 ...
  • 待補 ...
  • HandlerMethodArgumentResolver 是什麼? 就是用於解析參數的一個介面,springMVC(4.1)會直接調用這個介面的方法,對參數進行一定的解析。比如會在 具體實現: 1.@CurrentUser @Target({ElementType.PARAMETER}) @Ret ...
  • 1、pycharm創建ini格式的文件,沒有對應的 ini 文件類型 需要更新 Ini 2、setting–>marketplace 搜索 Ini ,然後進行安裝,重啟pycharm 3、重啟pycharm後,之前創建的 ini 文件會自動更改為 正確的 ini 文件圖標 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...