【高併發】什麼是ForkJoin?看這一篇就夠了!

来源:https://www.cnblogs.com/binghe001/archive/2020/04/12/12683253.html
-Advertisement-
Play Games

寫在前面 在JDK中,提供了這樣一種功能:它能夠將複雜的邏輯拆分成一個個簡單的邏輯來並行執行,待每個並行執行的邏輯執行完成後,再將各個結果進行彙總,得出最終的結果數據。有點像Hadoop中的MapReduce。 ForkJoin是由JDK1.7之後提供的多線程併發處理框架。ForkJoin框架的基本 ...


寫在前面

在JDK中,提供了這樣一種功能:它能夠將複雜的邏輯拆分成一個個簡單的邏輯來並行執行,待每個並行執行的邏輯執行完成後,再將各個結果進行彙總,得出最終的結果數據。有點像Hadoop中的MapReduce。

ForkJoin是由JDK1.7之後提供的多線程併發處理框架。ForkJoin框架的基本思想是分而治之。什麼是分而治之?分而治之就是將一個複雜的計算,按照設定的閾值分解成多個計算,然後將各個計算結果進行彙總。相應的,ForkJoin將複雜的計算當做一個任務,而分解的多個計算則是當做一個個子任務來並行執行。

Java併發編程的發展

對於Java語言來說,生來就支持多線程併發編程,在併發編程領域也是在不斷發展的。Java在其發展過程中對併發編程的支持越來越完善也正好印證了這一點。

  • Java 1 支持thread,synchronized。
  • Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
  • Java 7 加入了fork-join庫。
  • Java 8 加入了 parallel streams。

併發與並行

併發和並行在本質上還是有所區別的。

併發

併發指的是在同一時刻,只有一個線程能夠獲取到CPU執行任務,而多個線程被快速的輪換執行,這就使得在巨集觀上具有多個線程同時執行的效果,併發不是真正的同時執行,併發可以使用下圖表示。

001

並行

並行指的是無論何時,多個線程都是在多個CPU核心上同時執行的,是真正的同時執行。

002

分治法

基本思想

把一個規模大的問題劃分為規模較小的子問題,然後分而治之,最後合併子問題的解得到原問題的解。

步驟

①分割原問題;

②求解子問題;

③合併子問題的解為原問題的解。

我們可以使用如下偽代碼來表示這個步驟。

if(任務很小){
    直接計算得到結果
}else{
    分拆成N個子任務
    調用子任務的fork()進行計算
    調用子任務的join()合併計算結果
}

在分治法中,子問題一般是相互獨立的,因此,經常通過遞歸調用演算法來求解子問題。

典型應用

  • 二分搜索
  • 大整數乘法
  • Strassen矩陣乘法
  • 棋盤覆蓋
  • 合併排序
  • 快速排序
  • 線性時間選擇
  • 漢諾塔

ForkJoin並行處理框架

ForkJoin框架概述

Java 1.7 引入了一種新的併發框架—— Fork/Join Framework,主要用於實現“分而治之”的演算法,特別是分治之後遞歸調用的函數。

ForkJoin框架的本質是一個用於並行執行任務的框架, 能夠把一個大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務的計算結果。在Java中,ForkJoin框架與ThreadPool共存,並不是要替換ThreadPool

其實,在Java 8中引入的並行流計算,內部就是採用的ForkJoinPool來實現的。例如,下麵使用並行流實現列印數組元組的程式。

public class SumArray {
    public static void main(String[] args){
        List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        numberList.parallelStream().forEach(System.out::println);
    }
}

這段代碼的背後就使用到了ForkJoinPool。

說到這裡,可能有讀者會問:可以使用線程池的ThreadPoolExecutor來實現啊?為什麼要使用ForkJoinPool啊?ForkJoinPool是個什麼鬼啊?! 接下來,我們就來回答這個問題。

ForkJoin框架原理

ForkJoin框架是從jdk1.7中引入的新特性,它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限隊列來保存需要執行的任務,而線程的數量則是通過構造函數傳入,如果沒有向構造函數中傳入指定的線程數量,那麼當前電腦可用的CPU數量會被設置為線程數量作為預設值。

ForkJoinPool主要使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序演算法。這裡的要點在於,ForkJoinPool能夠使用相對較少的線程來處理大量的任務。比如要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合併任務。以此類推,對於500萬的數據也會做出同樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概200萬+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。

所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法向任務隊列中再添加一個任務併在等待該任務完成之後再繼續執行。而使用ForkJoinPool就能夠解決這個問題,它就能夠讓其中的線程創建新的任務,並掛起當前的任務,此時線程就能夠從隊列中選擇子任務執行。

那麼使用ThreadPoolExecutor或者ForkJoinPool,性能上會有什麼差異呢?

首先,使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有父子關係的任務,比如使用4個線程來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關係的任務時,也需要200萬個線程,很顯然這是不可行的,也是很不合理的!!

工作竊取演算法

假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競爭,於是把這些子任務分別放到不同的隊列里,併為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務幹完,而其他線程對應的隊列里還有任務等待處理。幹完活的線程與其等著,不如去幫其他線程幹活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工作竊取演算法的優點:
充分利用線程進行並行計算,並減少了線程間的競爭。

工作竊取演算法的缺點:
在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。並且該演算法會消耗更多的系統資源,比如創建多個線程和多個雙端隊列。

Fork/Join框架局限性:

對於Fork/Join框架而言,當一個任務正在等待它使用Join操作創建的子任務結束時,執行這個任務的工作線程查找其他未被執行的任務,並開始執行這些未被執行的任務,通過這種方式,線程充分利用它們的運行時間來提高應用程式的性能。為了實現這個目標,Fork/Join框架執行的任務有一些局限性。

(1)任務只能使用Fork和Join操作來進行同步機制,如果使用了其他同步機制,則在同步操作時,工作線程就不能執行其他任務了。比如,在Fork/Join框架中,使任務進行了睡眠,那麼,在睡眠期間內,正在執行這個任務的工作線程將不會執行其他任務了。
(2)在Fork/Join框架中,所拆分的任務不應該去執行IO操作,比如:讀寫數據文件。
(3)任務不能拋出檢查異常,必須通過必要的代碼來出來這些異常。

ForkJoin框架的實現

ForkJoin框架中一些重要的類如下所示。

004

ForkJoinPool 框架中涉及的主要類如下所示。

1.ForkJoinPool類

實現了ForkJoin框架中的線程池,由類圖可以看出,ForkJoinPool類實現了線程池的Executor介面。

我們也可以從下圖中看出ForkJoinPool的類圖關係。

005

其中,可以使用Executors.newWorkStealPool()方法創建ForkJoinPool。

ForkJoinPool中提供瞭如下提交任務的方法。

public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)

2.ForkJoinWorkerThread類

實現ForkJoin框架中的線程。

3.ForkJoinTask

ForkJoinTask封裝了數據及其相應的計算,並且支持細粒度的數據並行。ForkJoinTask比線程要輕量,ForkJoinPool中少量工作線程能夠運行大量的ForkJoinTask。

ForkJoinTask類中主要包括兩個方法fork()和join(),分別實現任務的分拆與合併。

fork()方法類似於Thread.start(),但是它並不立即執行任務,而是將任務放入工作隊列中。跟Thread.join()方法不同,ForkJoinTask的join()方法並不簡單的阻塞線程,而是利用工作線程運行其他任務,當一個工作線程中調用join(),它將處理其他任務,直到註意到目標子任務已經完成。

我們可以使用下圖來表示這個過程。

006

ForkJoinTask有3個子類:

007

  • RecursiveAction:無返回值的任務。
  • RecursiveTask:有返回值的任務。
  • CountedCompleter:完成任務後將觸發其他任務。

4.RecursiveTask

有返回結果的ForkJoinTask實現Callable。

5.RecursiveAction類

無返回結果的ForkJoinTask實現Runnable。

6.CountedCompleter

在任務完成執行後會觸發執行一個自定義的鉤子函數。

ForkJoin示常式序

package io.binghe.concurrency.example.aqs;
 
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務大於閾值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
 
            // 執行子任務
            leftTask.fork();
            rightTask.fork();
 
            // 等待任務執行結束合併其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
 
            // 合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();
 
        //生成一個計算任務,計算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
 
        //執行一個任務
        Future<Integer> result = forkjoinPool.submit(task);
 
        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

寫在最後

如果覺得文章對你有點幫助,請微信搜索並關註「 冰河技術 」微信公眾號,跟冰河學習高併發編程技術。

最後,附上併發編程需要掌握的核心技能知識圖,祝大家在學習併發編程時,少走彎路。

sandahexin_20200322


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • thrift類似java裡面的socket和sockchannel中server和client通信 thrift最重要的是跨語言,裡面提供了序列化和反序列化、json和實體對象等方法 Apache Thrift軟體框架(用於可擴展的跨語言服務開發)將軟體堆棧與代碼生成引擎結合在一起,以構建可在C++ ...
  • 作為一個碼農,你知道如何啟動一個java線程嗎? 啟動線程 public class PrintThread extends Thread { public void run() { System.out.println("我是線程! 繼承自Thread"); } public static voi ...
  • 全民閱讀的時代已經來臨,目前使用讀書軟體的用戶數2.1億,日活躍用戶超過500萬,其中19-35歲年輕用戶占比超過60%,本科及以上學歷用戶占比高達80%,北上廣深及其他省會城市/直轄市用戶占比超過80%。**本人習慣使用微信讀書,為了方便整理書籍和導出筆記,便開發了這個小工具。** ...
  • 前言 在【JAVA進階架構師指南】系列二和三中,我們瞭解了JVM的記憶體模型以及類載入機制,其中在記憶體模型中,我們說到,從線程角度來說,JVM分為線程私有的區域(虛擬機棧/本地方法棧/程式計數器)和線程公有區域(方法區和java堆),其中線程私有區域記憶體隨著線程的結束而跟著被回收,GC主要關註的是堆和 ...
  • 靈魂拷問:在不重啟服務的前提下,如何讓配置修改生效的呢?有什麼奇技淫巧嗎? 靈魂拷問:在 Java 項目中,總能看到以 .properties 為尾碼的文件蹤影,這類配置文件是怎麼載入的呢? 項目研發過程中,總會遇到一些經常改變的參數,比如要連接的資料庫的連接地址、名稱、用戶名、密碼;再比如訪問三方 ...
  • 1、什麼是字元串? Go語言中字元串是一個位元組切片。把內容放在雙引號""之間,我們可以創建一個字元串,讓我們來看一下創建並列印字元串的簡單示例。 package main import ( "fmt" ) func main() { str := "hello golang" fmt.Println ...
  • 一、Java註解 1.引入起始:Java5.0開始引入; 2.該功能可用於類、構造方法、成員變數、方法、參數 3.註解功能的影響範圍:不影響程式的正常執行,但是會對編譯器等輔助工具產生影響。 4.定義:註解又可以稱為標註,是程式的元數據,也是程式代碼的標記。 5.獲取方式:在編譯、載入類和運行時。 ...
  • PHP常用設計模式詳解 單例模式: php交流群:159789818 特性:單例類只能有一個實例 類內__construct構造函數私有化,防止new實例 類內__clone私有化,防止複製對象 設置一個$instance私有靜態屬性,為了保存當前類的實例 設置一個getInstance公有方法,為 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...