掌握JDK21全新結構化併發編程,輕鬆提升開發效率!

来源:https://www.cnblogs.com/JavaEdge/archive/2023/08/22/17648963.html
-Advertisement-
Play Games

## 1 概要 通過引入結構化併發編程的API,簡化併發編程。結構化併發將在不同線程中運行的相關任務組視為單個工作單元,從而簡化錯誤處理和取消操作,提高可靠性,並增強可觀察性。這是一個預覽版的API。 ## 2 歷史 結構化併發是由JEP 428提出的,併在JDK 19中作為孵化API發佈。它在JD ...


1 概要

通過引入結構化併發編程的API,簡化併發編程。結構化併發將在不同線程中運行的相關任務組視為單個工作單元,從而簡化錯誤處理和取消操作,提高可靠性,並增強可觀察性。這是一個預覽版的API。

2 歷史

結構化併發是由JEP 428提出的,併在JDK 19中作為孵化API發佈。它在JDK 20中被JEP 437重新孵化,通過對作用域值(JEP 429)進行輕微更新。

我們在這裡提議將結構化併發作為JUC包中的預覽API。唯一重要變化是StructuredTaskScope::fork(...)方法返回一個[子任務],而不是一個Future,如下麵所討論的。

3 目標

推廣一種併發編程風格,可以消除由於取消和關閉而產生的常見風險,如線程泄漏和取消延遲。

提高併發代碼的可觀察性。

4 非目標

不替換JUC包中的任何併發構造,如ExecutorService和Future。

不定義Java平臺的最終結構化併發API。其他結構化併發構造可以由第三方庫定義,或在未來的JDK版本中定義。

不定義線上程之間共用數據流的方法(即通道)。會在未來提出這樣做。

不用新的線程取消機制替換現有的線程中斷機制。會在未來提出這樣做。

5 動機

開發人員通過將任務分解為多個子任務來管理複雜性。在普通的單線程代碼中,子任務按順序執行。然而,如果子任務彼此足夠獨立,並且存在足夠的硬體資源,那麼通過在不同線程中併發執行子任務,可以使整個任務運行得更快(即具有較低的延遲)。例如,將多個I/O操作的結果組合成一個任務,如果每個I/O操作都在自己的線程中併發執行,那麼任務將運行得更快。虛擬線程(JEP 444)使得為每個此類I/O操作分配一個線程成為一種具有成本效益的方法,但是管理可能會產生大量線程仍然是一個挑戰。

6 ExecutorService 非結構化併發

java.util.concurrent.ExecutorService API 是在 Java 5 中引入的,它幫助開發人員以併發方式執行子任務。

如下 handle() 的方法,它表示伺服器應用程式中的一個任務。它通過將兩個子任務提交給 ExecutorService 來處理傳入的請求。

ExecutorService 立即返回每個子任務的 Future,並根據 Executor 的調度策略同時執行這些子任務。handle() 方法通過阻塞調用它們的 Futureget() 方法來等待子任務的結果,因此該任務被稱為加入了其子任務。

Response handle() throws ExecutionException, InterruptedException {
    Future<String> user = esvc.submit(() -> findUser());
    Future<Integer> order = esvc.submit(() -> fetchOrder());
    String theUser = user.get();   // 加入 findUser
    int theOrder = order.get();    // 加入 fetchOrder
    return new Response(theUser, theOrder);
}

由於子任務併發執行,每個子任務都可獨立地成功或失敗。在這個上下文中,"失敗" 意味著拋出異常。通常,像 handle() 這樣的任務應該在任何一個子任務失敗時失敗。當出現失敗時,理解線程的生命周期會變得非常複雜:

  • findUser() 拋異常,那麼調用 user.get()handle() 也會拋出異常,但是 fetchOrder() 會繼續在自己的線程中運行。這是線程泄漏,最好情況下浪費資源,最壞情況下 fetchOrder() 的線程可能會幹擾其他任務。

  • 如執行 handle() 的線程被中斷,這個中斷不會傳播到子任務。findUser()fetchOrder() 的線程都會泄漏,即使在 handle() 失敗後仍然繼續運行。

  • 如果 findUser() 執行時間很長,但是在此期間 fetchOrder() 失敗,那麼 handle() 將不必要地等待 findUser(),因為它會在 user.get() 上阻塞,而不是取消它。只有在 findUser() 完成並且 user.get() 返回後,order.get() 才會拋出異常,導致 handle() 失敗。

每種case下,問題在於我們的程式在邏輯上被結構化為任務-子任務關係,但這些關係只存在於開發人員的頭腦中。這不僅增加錯誤可能性,還會使診斷和排除此類錯誤變得更加困難。例如,線程轉儲等可觀察性工具會在不相關的線程調用棧中顯示 handle()findUser()fetchOrder(),而沒有任務-子任務關係的提示。

可嘗試在錯誤發生時顯式取消其他子任務,例如通過在失敗的任務的 catch 塊中使用 try-finally 包裝任務,並調用其他任務的 Futurecancel(boolean) 方法。我們還需要在 try-with-resources 語句中使用 ExecutorService,就像

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    IntStream.range(0, 10_000).forEach(i -> {
        executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1));
            return i;
        });
    });
}  // executor.close() is called implicitly, and waits

因為 Future 沒有提供等待被取消的任務的方法。但所有這些都很難做到,並且往往會使代碼的邏輯意圖變得更加難以理解。跟蹤任務之間的關係,並手動添加所需的任務間取消邊緣,是對開發人員的一種很大要求。

無限制的併發模式

這種需要手動協調生命周期的需求是因為 ExecutorServiceFuture 允許無限制的併發模式。在涉及的所有線程中,沒有限制或順序:

  • 一個線程可以創建一個 ExecutorService
  • 另一個線程可以向其提交工作
  • 執行工作的線程與第一個或第二個線程沒有任何關係

線程提交工作之後,一個完全不同的線程可以等待執行的結果。具有對 Future 的引用的任何代碼都可以加入它(即通過調用 get() 等待其結果),甚至可以在與獲取 Future 的線程不同的線程中執行代碼。實際上,由一個任務啟動的子任務不必返回到提交它的任務。它可以返回給許多任務中的任何一個,甚至可能是沒有返回給任何任務。

因為 ExecutorServiceFuture 允許這種無結構的使用,它們既不強制執行也不跟蹤任務和子任務之間的關係,儘管這些關係是常見且有用的。因此,即使子任務在同一個任務中被提交和加入,一個子任務的失敗也不能自動導致另一個子任務的取消。在上述的 handle() 方法中,fetchOrder() 的失敗不能自動導致 findUser() 的取消。fetchOrder()FuturefindUser()Future 沒有關係,也與最終通過其 get() 方法加入它的線程無關。與其要求開發人員手動管理這種取消,我們希望能夠可靠地自動化這一過程。

任務結構應反映代碼結構

ExecutorService 下的自由線程組合相反,單線程代碼的執行總是強制執行任務和子任務的層次結構。方法的代碼塊 {...} 對應一個任務,代碼塊內部調用的方法對應子任務。調用的方法必須返回給調用它的方法,或者拋出異常給調用它的方法。它不能生存於調用它的方法之外,也不能返回或拋出異常給其他方法。因此,所有子任務在任務之前完成,每個子任務都是其父任務的子任務,每個子任務的生命周期相對於其他子任務和任務來說,都由代碼塊結構的語法規則來管理。

如單線程版本的 handle() 中,任務-子任務關係在語法結構明顯:

Response handle() throws IOException {
    String theUser = findUser();
    int theOrder = fetchOrder();
    return new Response(theUser, theOrder);
}

我們不會在 findUser() 子任務完成之前啟動 fetchOrder() 子任務,無論 findUser() 是成功還是失敗。如果 findUser() 失敗,我們根本不會啟動 fetchOrder(),而且 handle() 任務會隱式地失敗。一個子任務只能返回給其父任務,這是很重要的:這意味著父任務可以將一個子任務的失敗隱式地視為觸發來取消其他未完成的子任務,然後自己失敗。

單線程代碼中,任務-子任務層次關係在運行時的調用棧中得到體現。因此,我們獲得了相應的父子關係,這些關係管理著錯誤傳播。觀察單個線程時,層次關係顯而易見:findUser()(及後來的 fetchOrder())似乎是在 handle() 下執行的。這使得回答問題 "handle() 正在處理什麼?" 很容易。

如任務和子任務之間的父子關係在代碼的語法結構中明顯,並且在運行時得到了體現,那併發編程將更加容易、可靠且易於觀察,就像單線程代碼一樣。語法結構將定義子任務的生命周期,並使得能夠在運行時創建一個類似於單線程調用棧的線程層次結構的表示。這種表示將實現錯誤傳播、取消以及對併發程式的有意義的觀察。

7 結構化併發

結構化併發是一種併發編程方法,它保持了任務和子任務之間的自然關係,從而實現了更具可讀性、可維護性和可靠性的併發代碼。"結構化併發" 這個術語由 Martin Sústrik 提出,並由 Nathaniel J. Smith 推廣。從其他編程語言中的概念,如 Erlang 中的層次監控者,可以瞭解到結構化併發中錯誤處理的設計思想。

結構化併發源於一個簡單的原則:

如果一個任務分解為併發的子任務,那麼所有這些子任務都會返回到同一個地方,即任務的代碼塊。

在結構化併發中,子任務代表任務工作。任務等待子任務的結果並監視它們的失敗情況。與單線程代碼中的結構化編程技術類似,結構化併發在多線程中的威力來自於兩個思想:

  • 為代碼塊中的執行流程定義明確的進入和退出點
  • 在嚴格的操作生命周期嵌套中,以反映它們在代碼中的語法嵌套方式

由於代碼塊的進入和退出點被明確定義,因此併發子任務的生命周期被限定在其父任務的語法塊中。因為同級子任務的生命周期嵌套在其父任務的生命周期之內,因此可以將它們作為一個單元進行推理和管理。由於父任務的生命周期,依次嵌套在其父任務的生命周期之內,運行時可以將任務層次結構實現為樹狀結構,類似於單線程調用棧的併發對應物。這允許代碼為任務子樹應用策略,如截止時間,並允許可觀察性工具將子任務呈現為父任務的下屬。

結構化併發非常適合虛擬線程,這是由JDK實現的輕量級線程。許多虛擬線程可以共用同一個操作系統線程,從而可以支持非常大量的虛擬線程。除此外,虛擬線程足夠廉價,可以表示任何涉及I/O等併發行為。這意味著伺服器應用程式可以使用結構化併發來同時處理成千上萬甚至百萬個傳入請求:它可以為處理每個請求的任務分配一個新的虛擬線程,當一個任務通過提交子任務進行併發執行時,它可以為每個子任務分配一個新的虛擬線程。在幕後,任務-子任務關係通過為每個虛擬線程提供一個對其唯一父任務的引用來實現為樹狀結構,類似於調用棧中的幀引用其唯一的調用者。

總之,虛擬線程提供了大量的線程。結構化併發可以正確且強大地協調它們,並使可觀察性工具能夠按照開發人員的理解顯示線程。在JDK中擁有結構化併發的API將使構建可維護、可靠且可觀察的伺服器應用程式變得更加容易。

8 描述

結構化併發 API 的主要類是 java.util.concurrent 包中的 StructuredTaskScope。該類允許開發人員將一個任務結構化為一組併發的子任務,並將它們作為一個單元進行協調。子任務通過分別分叉它們並將它們作為一個單元加入,可能作為一個單元取消,來在它們自己的線程中執行。子任務的成功結果或異常由父任務彙總並處理。StructuredTaskScope 將子任務的生命周期限制在一個清晰的詞法作用域內,在這個作用域中,任務與其子任務的所有交互(分叉、加入、取消、處理錯誤和組合結果)都發生。

前面提到的 handle() 示例,使用 StructuredTaskScope 編寫:

Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Supplier<String> user = scope.fork(() -> findUser());
        Supplier<Integer> order = scope.fork(() -> fetchOrder());

        scope.join()             // 加入兩個子任務
             .throwIfFailed();   // ... 並傳播錯誤

        // 兩個子任務都成功完成,因此組合它們的結果
        return new Response(user.get(), order.get());
    }
}

與原始示例相比,理解涉及的線程的生命周期在這裡變得更加容易:在所有情況下,它們的生命周期都限制在一個詞法作用域內,即 try-with-resources 語句的代碼塊內。此外,使用 StructuredTaskScope 可以確保一些有價值的屬性:

  1. 錯誤處理與短路 — 如果 findUser()fetchOrder() 子任務中的任何一個失敗,另一個如果尚未完成則會被取消。(這由 ShutdownOnFailure 實現的關閉策略來管理;還有其他策略可能)。
  2. 取消傳播 — 如果在運行 handle() 的線程在調用 join() 之前或之中被中斷,則線程在退出作用域時會自動取消兩個子任務。
  3. 清晰性 — 上述代碼具有清晰的結構:設置子任務,等待它們完成或被取消,然後決定是成功(並處理已經完成的子任務的結果)還是失敗(子任務已經完成,因此沒有更多需要清理的)。
  4. 可觀察性 — 如下所述,線程轉儲清楚地顯示了任務層次結構,其中運行 findUser()fetchOrder() 的線程被顯示為作用域的子任務。

9 突破預覽版限制

StructuredTaskScope 是預覽版 API,預設禁用。要使用 StructuredTaskScope API,需啟用預覽 API:

  1. 使用 javac --release 21 --enable-preview Main.java 編譯程式,然後使用 java --enable-preview Main 運行它;或
  2. 當使用源代碼啟動器時,使用 java --source 21 --enable-preview Main.java 運行程式
  3. IDEA 運行時,勾選即可:

10 使用 StructuredTaskScope

10.1 API

public class StructuredTaskScope<T> implements AutoCloseable {

    public <U extends T> Subtask<U> fork(Callable<? extends U> task);
    public void shutdown();

    public StructuredTaskScope<T> join() throws InterruptedException;
    public StructuredTaskScope<T> joinUntil(Instant deadline)
        throws InterruptedException, TimeoutException;
    public void close();

    protected void handleComplete(Subtask<? extends T> handle);
    protected final void ensureOwnerAndJoined();

}

10.2 工作流程

  1. 創建一個作用域。創建作用域的線程是其所有者。
  2. 使用 fork(Callable) 方法在作用域中分叉子任務。
  3. 在任何時間,任何子任務,或者作用域的所有者,都可以調用作用域的 shutdown() 方法來取消未完成的子任務並阻止分叉新的子任務。
  4. 作用域的所有者將作用域(即所有子任務)作為一個單元加入。所有者可以調用作用域的 join() 方法,等待所有子任務已完成(無論成功與否)或通過 shutdown() 被取消。或者,它可以調用作用域的 joinUntil(java.time.Instant) 方法,等待直到截止時間。
  5. 加入後,處理子任務中的任何錯誤並處理其結果。
  6. 關閉作用域,通常通過隱式使用 try-with-resources 實現。這會關閉作用域(如果尚未關閉),並等待被取消但尚未完成的任何子任務完成。

每次調用 fork(...) 都會啟動一個新線程來執行一個子任務,預設情況下是虛擬線程。一個子任務可以創建它自己的嵌套的 StructuredTaskScope 來分叉它自己的子任務,從而創建一個層次結構。該層次結構反映在代碼的塊結構中,限制了子任務的生命周期:在作用域關閉後,所有子任務的線程都保證已終止,當塊退出時不會留下任何線程。

在作用域中的任何子任務,嵌套作用域中的任何子子任務,以及作用域的所有者,都可以隨時調用作用域的 shutdown() 方法,表示任務已完成,即使其他子任務仍在執行。shutdown() 方法會中斷仍在執行子任務的線程,並導致 join()joinUntil(Instant) 方法返回。因此,所有子任務都應該被編寫為響應中斷。在調用 shutdown() 後分叉的新子任務將處於 UNAVAILABLE 狀態,不會被運行。實際上,shutdown() 是順序代碼中 break 語句的併發模擬。

在作用域內部調用 join()joinUntil(Instant) 是強制性的。如果作用域的代碼塊在加入之前退出,則作用域將等待所有子任務終止,然後拋出異常。

作用域的所有者線程可能在加入之前或加入期間被中斷。例如,它可能是封閉作用域的子任務。如果發生這種情況,則 join()joinUntil(Instant) 將拋出異常,因為繼續執行沒有意義。然後,try-with-resources 語句將關閉作用域,取消所有子任務並等待它們終止。這的效果是自動將任務的取消傳播到其子任務。如果 joinUntil(Instant) 方法的截止時間在子任務終止或調用 shutdown() 之前到期,則它將拋出異常,再次,try-with-resources 語句將關閉作用域。

join() 成功完成時,每個子任務已經成功完成、失敗或因作用域被關閉而被取消。

一旦加入,作用域的所有者會處理失敗的子任務並處理成功完成的子任務的結果;這通常是通過關閉策略來完成的(見下文)。成功完成的任務的結果可以使用 Subtask.get() 方法獲得。get() 方法永遠不會阻塞;如果錯誤地在加入之前或子任務尚未成功完成時調用它,則會拋出 IllegalStateException

在作用域中分叉任務的子任務時,會繼承 ScopedValue 綁定(JEP 446)。如果作用域的所有者從綁定的 ScopedValue 中讀取值,則每個子任務將讀取相同的值。

如果作用域的所有者本身是現有作用域的子任務,即作為分叉子任務創建的,則該作用域成為新作用域的父作用域。因此,作用域和子任務形成一個樹狀結構。

在運行時,StructuredTaskScope 強制執行結構和順序併發操作。因此,它不實現 ExecutorServiceExecutor 介面,因為這些介面的實例通常以非結構化方式使用(見下文)。然而,將使用 ExecutorService 的代碼遷移到使用 StructuredTaskScope 並從結構上受益是直接的。

實際上,大多數使用 StructuredTaskScope 的情況下,可能不會直接使用 StructuredTaskScope 類,而是使用下一節描述的兩個實現了關閉策略的子類之一。在其他情況下,用戶可能會編寫自己的子類來實現自定義的關閉策略。

11 關閉策略

在處理併發子任務時,通常會使用短路模式來避免不必要的工作。有時,例如,如果其中一個子任務失敗,就會取消所有子任務(即同時調用所有任務),或者在其中一個子任務成功時取消所有子任務(即同時調用任何任務)。StructuredTaskScope 的兩個子類,ShutdownOnFailureShutdownOnSuccess,支持這些模式,並提供在第一個子任務失敗或成功時關閉作用域的策略。

關閉策略還提供了集中處理異常以及可能的成功結果的方法。這符合結構化併發的精神,即整個作用域被視為一個單元。

11.1 案例

上面的 handle() 示例也使用了這策略,它在併發運行一組任務併在其中任何一個任務失敗時失敗:

<T> List<T> runAll(List<Callable<T>> tasks) 
        throws InterruptedException, ExecutionException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        List<? extends Supplier<T>> suppliers = tasks.stream().map(scope::fork).toList();
        scope.join()
             .throwIfFailed();  // 任何子任務失敗,拋異常
        // 在這裡,所有任務都已成功完成,因此組合結果
        return suppliers.stream().map(Supplier::get).toList();
    }
}

在第一個成功的子任務返回結果後返回該結果:

<T> T race(List<Callable<T>> tasks, Instant deadline) 
        throws InterruptedException, ExecutionException, TimeoutException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
        for (var task : tasks) {
            scope.fork(task);
        }
        return scope.joinUntil(deadline)
                    .result();  // 如果沒有任何子任務成功完成,拋出異常
    }
}

一旦有一個子任務成功,此作用域將自動關閉,取消未完成的子任務。如果所有子任務失敗或給定的截止時間過去,任務將失敗。這種模式在需要從一組冗餘服務中獲得任何一個服務的結果的伺服器應用程式中非常有用。

雖然這倆關閉策略已內置,但開發人員可以創建自定義策略來抽象其他模式。

11.2 處理結果

在通過關閉策略(例如,通過 ShutdownOnFailure::throwIfFailed)進行集中異常處理和加入之後,作用域的所有者可以使用從調用 fork(...) 返回的 [Subtask] 對象處理子任務的結果,如果這些結果沒有被策略處理(例如,通過 ShutdownOnSuccess::result())。

通常情況下,作用域所有者將只調用 get() 方法的 Subtask 方法。所有其他的 Subtask 方法通常只會在自定義關閉策略的 handleComplete(...) 方法的實現中使用。實際上,我們建議將引用由 fork(...) 返回的 Subtask 的變數類型定義為 Supplier<String> 而不是 Subtask<String>(除非當然選擇使用 var)。如果關閉策略本身處理子任務結果(如在 ShutdownOnSuccess 的情況下),則應完全避免使用由 fork(...) 返回的 Subtask 對象,並將 fork(...) 方法視為返回 void。子任務應將其結果作為它們的返回結果,作為策略在處理中央異常後應處理的任何信息。

如果作用域所有者處理子任務異常以生成組合結果,而不是使用關閉策略,則異常可以作為從子任務返回的值返回。例如,下麵是一個在並行運行一組任務並返回包含每個任務各自成功或異常結果的完成 Future 列表的方法:

<T> List<Future<T>> executeAll(List<Callable<T>> tasks)
        throws InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    	  List<? extends Supplier<Future<T>>> futures = tasks.stream()
    	      .map(task -> asFuture(task))
     	      .map(scope::fork)
     	      .toList();
    	  scope.join();
    	  return futures.stream().map(Supplier::get).toList();
    }
}

static <T> Callable<Future<T>> asFuture(Callable<T> task) {
   return () -> {
       try {
           return CompletableFuture.completedFuture(task.call());
       } catch (Exception ex) {
           return CompletableFuture.failedFuture(ex);
       }
   };
}

11.3 自定義關閉策略

StructuredTaskScope 可以被擴展,並且可以覆蓋其受保護的 handleComplete(...) 方法,以實現除 ShutdownOnSuccessShutdownOnFailure 之外的其他策略。子類可以,例如:

  • 收集成功完成的子任務的結果,並忽略失敗的子任務,
  • 在子任務失敗時收集異常,或者
  • 在出現某種條件時調用 shutdown() 方法以關閉並導致 join() 方法喚醒。

當一個子任務完成時,即使在調用 shutdown() 之後,它也會作為一個 Subtask 報告給 handleComplete(...) 方法:

public sealed interface Subtask<T> extends Supplier<T> {
    enum State { SUCCESS, FAILED, UNAVAILABLE }

    State state();
    Callable<? extends T> task();
    T get();
    Throwable exception();
}

當子任務在 SUCCESS 狀態或 FAILED 狀態下完成時,handleComplete(...) 方法將被調用。如果子任務處於 SUCCESS 狀態,可以調用 get() 方法,如果子任務處於 FAILED 狀態,則可以調用 exception() 方法。在其他情況下調用 get()exception() 會引發 IllegalStateException 異常。UNAVAILABLE 狀態表示以下情況之一:(1)子任務被 fork 但尚未完成;(2)子任務在關閉後完成,或者(3)子任務在關閉後被 fork,因此尚未啟動。handleComplete(...) 方法永遠不會為處於 UNAVAILABLE 狀態的子任務調用。

子類通常會定義方法,以使結果、狀態或其他結果在 join() 方法返回後可以被後續代碼使用。收集結果並忽略失敗子任務的子類可以定義一個方法,該方法返回一系列結果。實施在子任務失敗時關閉的策略的子類可以定義一個方法,以獲取失敗的第一個子任務的異常。

擴展 StructuredTaskScope 的子類

該子類收集成功完成的子任務的結果。它定義了 results() 方法,供主任務用於檢索結果。

class MyScope<T> extends StructuredTaskScope<T> {

    private final Queue<T> results = new ConcurrentLinkedQueue<>();

    MyScope() { super(null, Thread.ofVirtual().factory()); }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS)
            results.add(subtask.get());
    }

    @Override
    public MyScope<T> join() throws InterruptedException {
        super.join();
        return this;
    }

    // 返回從成功完成的子任務獲取的結果流
    public Stream<T> results() {
        super.ensureOwnerAndJoined();
        return results.stream();
    }

}

可以像這樣使用這個自定義策略:

<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
    try (var scope = new MyScope<T>()) {
        for (var task : tasks) scope.fork(task);
        return scope.join()
                    .results().toList();
    }
}

扇入場景

上面的示例側重於扇出場景,這些場景管理多個併發的出站 I/O 操作。StructuredTaskScope 在扇入場景中也非常有用,這些場景管理多個併發的入站 I/O 操作。在這種情況下,我們通常會響應傳入請求而動態地創建未知數量的子任務。

以下是一個伺服器的示例,它在 StructuredTaskScope 中 fork 子任務以處理傳入連接:

void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
    try (var scope = new StructuredTaskScope<Void>()) {
        try {
            while (true) {
                var socket = serverSocket.accept();
                scope.fork(() -> handle(socket));
            }
        } finally {
            // 如果發生錯誤或被中斷,我們停止接受連接
            scope.shutdown();  // 關閉所有活動連接
            scope.join();
        }
    }
}

從併發的角度來看,這種情況與請求的方向不同,但在持續時間和任務數量方面是不同的,因為子任務是根據外部事件動態 fork 的。

所有處理連接的子任務都在作用域內創建,因此線上程轉儲中很容易看到它們在一個作用域的所有者的子線程。作用域的所有者也很容易被當作一個單元關閉整個服務。

可觀察性

我們擴展了由 JEP 444 添加的新的 JSON 線程轉儲格式,以顯示 StructuredTaskScope 將線程分組成層次結構:

$ jcmd <pid> Thread.dump_to_file -format=json <file>

每個作用域的 JSON 對象包含一個線程數組,這些線程在作用域中被 fork,並附帶它們的堆棧跟蹤。作用域的所有者線程通常會在 join() 方法中被阻塞,等待子任務完成;線程轉儲可以通過顯示由結構化併發所施加的樹狀層次結構,輕鬆地查看子任務的線程正在做什麼。作用域的 JSON 對象還具有對其父級的引用,以便可以從轉儲中重新構建程式的結構。

com.sun.management.HotSpotDiagnosticsMXBean API 也可以用來生成這樣的線程轉儲,可以通過平臺的 MBeanServer 和本地或遠程的 JMX 工具直接或間接地使用它。

為什麼 fork(...) 沒有返回 Future

StructuredTaskScope API 處於孵化狀態時,fork(...) 方法返回了 Future。這使得 fork(...) 更像是現有的 ExecutorService::submit 方法,從而提供了一種熟悉的感覺。然而,考慮到 StructuredTaskScope 的使用方式與 ExecutorService 完全不同 — 即以上文描述的結構化方式使用 — 使用 Future 帶來的更多困惑遠遠超過了清晰性。

熟悉的 Future 的使用涉及調用其 get() 方法,它會阻塞直到結果可用。但在 StructuredTaskScope 的上下文中,以這種方式使用 Future 不僅是不鼓勵的,而且是不切實際的。Structured Future 對象應該只有在 join() 返回之後查詢,此時它們已知已完成或取消,而應使用的方法不是熟悉的 get(),而是新引入的 resultNow(),它永遠不會阻塞。

一些開發人員想知道為什麼 fork(...) 沒有返回更強大的 CompletableFuture 對象。由於應該只有在已知它們已完成時才使用 fork(...) 返回的 Future,因此 CompletableFuture 不會提供任何好處,因為其高級功能只對未完成的 futures 有用。此外,CompletableFuture 是為非同步編程範例設計的,而 StructuredTaskScope 鼓勵阻塞範例。

總之,FutureCompletableFuture 的設計旨在提供在結構化併發中是有害的自由度。

結構化併發是將在不同線程中運行的多個任務視為單個工作單元,而 Future 主要在將多個任務視為單獨任務時有用。因此,作用域只應該阻塞一次以等待其子任務的結果,然後集中處理異常。因此,在絕大多數情況下,從 fork(...) 返回的 Future 上唯一應該調用的方法是 resultNow()。這是與 Future 的正常用法的顯著變化,而 Subtask::get() 方法的行為與在 API 孵化期間 Future::resultNow() 的行為完全相同。

替代方案

增強 ExecutorService 介面。我們對該介面進行了原型實現,該介面始終強制執行結構化並限制了哪些線程可以提交任務。然而,我們發現這在 JDK 和生態系統中的大多數使用情況下都不是結構化的。在完全不同的概念中重用相同的 API,會導致混淆。例如,將結構化 ExecutorService 實例傳遞給現有接受此類型的方法,幾乎肯定會在大多數情況下拋出異常。

本文由博客一文多發平臺 OpenWrite 發佈!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本次安裝的推薦配置: CentOS 7 (迅雷種子 http://ftp.nara.wide.ad.jp/pub/Linux/centos/7.9.2009/isos/x86_64/CentOS-7-x86_64-DVD-2009.torrent) nginx 1.24.0 mysql 5.7.43 ...
  • 在當今的數字時代,大規模數據處理和分析已經成為了企業和組織中不可或缺的一部分。為了有效地處理和分析海量的數據,Hadoop生態系統應運而生。本文將深入探討Hadoop生態系統的工作原理,介紹其關鍵組件以及如何使用它來處理和分析大規模數據。 ## 什麼是Hadoop? Hadoop是一個開源的分散式計 ...
  • Infinispan 是一個基於分散式系統的記憶體數據存儲和緩存平臺,它的集群實現原理涉及到節點的發現和通信。在 Infinispan 中,集群是由多個節點組成的,每個節點都存儲著數據的一部分,並且通過通信來保持數據的一致性和可用性。 Infinispan 集群的實現原理主要包括以下幾個關鍵點: 1. ...
  • # 簡介 Spring Boot Admin(SBA)是一個針對spring-boot的actuator介面進行UI美化封裝的監控工具。它可以:在列表中瀏覽所有被監控spring-boot項目的基本信息,詳細的Health信息、記憶體信息、JVM信息、垃圾回收信息,還可以直接修改logger日誌的le ...
  • # 應用場景 * 用戶下單5分鐘後,給他發簡訊 * 用戶下單30分鐘後,如果用戶不付款就自動取消訂單 # kafka無死信隊列 kafka本身沒有這種延時隊列的機制,像rabbitmq有自己的死信隊列,當一些消息在一定時間不消費時會發到死信隊列,由死信隊列來處理它們,上面的兩個需求如果是rabbit ...
  • 本文翻譯自國外論壇 medium,原文地址:https://levelup.gitconnected.com/how-i-deleted-more-than-1000-lines-of-code-using-spring-retry-9118de29060 > 使用 Spring Retry 重構代 ...
  • 通過一張圖描述清楚TuGraph Analytics的整體架構和關鍵設計,幫助大家快速瞭解TuGraph Analytics項目輪廓。 ...
  • ![](https://img2023.cnblogs.com/other/1218593/202308/1218593-20230822164212978-1679813836.png) ### 背景 有時候我們需要進行遠程的debug,本文研究如何進行遠程debug,以及使用 IDEA 遠程de ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...