Fork/Join

来源:https://www.cnblogs.com/changming06/archive/2023/12/01/17869432.html
-Advertisement-
Play Games

Fork/Join框架簡介 Fork/Join框架簡介 Fork/Join它可以將一個大的任務拆分成多個子任務並行處理,最後將子任務結果合成並最後的計算結果,併進行輸出。FOrk/Join框架要完成兩件事情。Fork:把一個複雜的任務進行分析,大任務拆分成小任務;Join:把拆分的結果進行合併。 1 ...


Fork/Join框架簡介

Fork/Join框架簡介

Fork/Join它可以將一個大的任務拆分成多個子任務並行處理,最後將子任務結果合成並最後的計算結果,併進行輸出。FOrk/Join框架要完成兩件事情。Fork:把一個複雜的任務進行分析,大任務拆分成小任務;Join:把拆分的結果進行合併。

1.任務分割,Fork/Join框架需要把大的任務分割成足夠小的任務,如果子任務比較大的話還要對子任務進行繼續分割。
2.執行任務併合並結果,分割的子任務分別放到雙端隊列里,然後幾個啟動線程,分別從雙端隊列里獲取任務執行。子任務執行完後的結果都放在另一個隊列里,啟動一個線程從隊列里取數據,然後合併這些數據。

雙端隊列

雙端隊列(Double-ended Queue),簡稱Deque,是一種具有隊列和棧特性的數據結構。雙端隊列允許從兩端插入和刪除元素,因此可以在隊列的頭部和尾部進行插入和和刪除操作。
雙端隊列可以在隊列的兩端執行以下操作:

  • 在隊列頭部插入/刪除元素;
  • 在隊列尾部插入/刪除元素;
    雙端隊列的特點是可以在隊列的兩端進行插入和刪除操作,這使得它非常靈活和方便。它可以用於模擬棧,隊列和其他數據結構,同時也可以用於解決一些特定的問題,如滑動視窗問題等。在Java中,Deque介面是雙端隊列的標準實現,它有兩個主要的實現類:ArrayDeque,LinkedList。

Fork/Join框架實現任務分割和合併

  • 1.ForkJoinaTask:要使用Fork/Join框架,首先要創建一個ForkJoin任務,該類提供了在任務中執行fork/join的機制。通常情況下,我們不使用ForkJoin的子類ForkJoinTask,只需要繼承ForkJoinTask子類,然後使用,Fork/join框架提供了兩個子類,RecursiveAction 用以沒有返回結果的任務,RecursiveTask 用以有返回結果的任務。
  • 2.ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。
  • 3.RecursiveTask:繼承此類後,可以自定義調用的任務

ForkJoinPool架構圖

RecursiveTask架構圖

Fork/Join實現原理

ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責存放任務,以及將這些任務提交給ForkJoinPool,而ForkJoinWorkerThread負責執行這些任務。

Fork方法的實現原理

當我們調用ForkJoinTask的fork方法時,程式會把任務放在ForkJoinWorkerThread的pushTask的workQueue中,非同步地執行這個任務,然後立即返回結果。

    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

push方法把當前任務存放在ForkJoinTask數組隊列里。然後再調用ForkJoinPool的signalWork()方法喚醒或創建一個工作線程執行任務。

    final void push(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a; ForkJoinPool p;
        int b = base, s = top, n;
        if ((a = array) != null) {    // ignore if queue removed
            int m = a.length - 1;     // fenced write for task visibility
            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
            U.putOrderedInt(this, QTOP, s + 1);
            if ((n = s - b) <= 1) {
                if ((p = pool) != null)
                    p.signalWork(p.workQueues, this);//執行任務
            }
            else if (n >= m)
                growArray();
        }
    }

Join方法

Join方法的主要作用是阻塞當前線程並等待獲取結果。

	/**
     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     *
     * @return the computed result
     */
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)//執行doJoin() 根據返回狀態判斷返回結果
            reportException(s);
        return getRawResult();
    }

任務狀態有4種 已完成(NORMAL),被取消(CANCELLED),信號(SIGNAL)和出現異常(EXCEPTIONAL)

doJoin方法

/**
Implementation for join, get, quietlyJoin. Directly handles only cases of already-completed, external wait, * and unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
* Returns: status upon completion
*/
	/*
	流程
	1.當任務狀態,已經執行完成,就直接返回任務狀態
	2.如果沒有執行完,則從任務數組裡取出任務並執行。
	3.如果任務順利執行完成,則設置任務狀態未NORMAL,出現異常,則記錄異常,並將任務狀態設置為EXCEPTIONAL
	*/
	private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
/**
* Primary execution method for stolen tasks. Unless done, calls exec and records status if completed, but 	 * doesn't wait for completion otherwise.
* Returns: status on exit from this method
*/
	final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

Fork/Join框架的異常處理

ForkJoinTask在執行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,並且可以通過ForkJoinTask的getException獲取異常。

public final Throwable getException() {//任務沒有完成,或者沒有拋出異常返回null
    int s = status & DONE_MASK;
    return ((s >= NORMAL)    ? null :
            (s == CANCELLED) ? new CancellationException() ://任務取消,返回CancellationException
            getThrowableException());
}

案例計算1+..+100

class MyTask extends RecursiveTask<Integer> {

    public static final Integer VALUE = 10;
    private int begin;
    private int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (begin - end < VALUE) {//差值小於10
            for (int i = begin; i <= end; i++) {
                result += i;
            }
        } else {
            int middle = ( begin + end) / 2;
            MyTask myTask1 = new MyTask(begin, middle);
            MyTask myTask2 = new MyTask(middle, end);
            myTask1.fork();
            myTask2.fork();
            result = myTask1.join() + myTask2.join();
        }
        return result;
    }
}

public class ForkJoinDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        MyTask myTask = new MyTask(1, 100);
        ForkJoinTask<Integer> joinTask = forkJoinPool.submit(myTask);
        System.out.println(joinTask.get());
        forkJoinPool.shutdown();
    }
}

只是為了記錄自己的學習歷程,且本人水平有限,不對之處,請指正。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • TS中的類系統對比起JS完善了許多,知識點包括但不限於可訪問性、繼承類、實現介面、訪問器、泛型、抽象類。 ...
  • acwing week2 基礎演算法3總結 總結點1:雙指針演算法 //常用模版框架 for (int i = 0, j = 0; i < n; i ++ ) { while (j < i && check(i, j)) j ++ ; } 常見問題分類: (1) 對於一個序列,用兩個指針維護一段區間 ( ...
  • 十九、函數(二) 1、函數參數之接受不定量參數 1)普通函數不定量傳參用法 //接受不定量參數的函數 #include <cstdarg> //引入頭文件cstdarg int Add(unsigned count, ...) //第一個參數為參數的個數,第二個參數為三個. { int rt{}; ...
  • 小市值選股策略的核心在於通過綜合分析公司的基本面、行業定位、財務健康狀況以及市場趨勢, 來尋找那些被市場低估但具備顯著成長潛力的股票,同時也要重視風險管理和投資組合的多樣化。 今天來給大家分享下小市值策略代碼如下: # 顯式導入 BigQuant 相關 SDK 模塊 from bigdatasour ...
  • 工作中,經常遇到需要重試的場景,最簡單的方式可以用try...catch...加while迴圈來實現。那麼,有沒有統一的、優雅一點兒的處理方式呢?有的,Spring Retry就可以幫我們搞定重試問題。 關於重試,我們可以關註以下以下幾個方面: 什麼情況下去觸發重試機制 重試多少次,重試的時間間隔 ...
  • CompletableFuture非同步回調 CompletableFuture簡介 CompletableFuture被用於非同步編程,非同步通常意味著非阻塞,可以使得任務單獨允許在與主線程分離的其他線程中,並且通過回調可以在主線程中得到非同步任務的執行狀態,是否完成,和是否異常信息。 Completab ...
  • hutool工具包可以幫我們完成這件事,幾行代碼可以實現,我們提供兩種方式,壓縮本地文件和壓縮記憶體流。 壓縮本地文件 @Test public void zip(){ String entryName = "d:\\codegen\\1"; String zipFilePath = "d:\\cod ...
  • C語言迴圈結構詳解 在C語言中,迴圈結構是一種重要的控制結構,它允許我們重覆執行一段代碼,以達到特定的目的。迴圈結構可以幫助我們簡化重覆性的任務,提高代碼的效率。本篇文章將深入探討C語言中的迴圈結構,包括 while 迴圈、for 迴圈、do-while 迴圈以及迴圈中的控制語句。 1. while ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...