Java多線程開發系列之六:無限分解流----Fork/Join框架

来源:https://www.cnblogs.com/jilodream/archive/2023/06/14/17480266.html
-Advertisement-
Play Games

Fork譯為拆分,Join譯為合併Fork/Join框架的思路是把一個非常巨大的任務,拆分成若然的小任務,再由小任務繼續拆解。直至達到一個相對合理的任務粒度。然後執行獲得結果,然後將這些小任務的結果彙總,生成大任務的結果,直至彙總成最初巨大任務的結果。如下圖: 紅色箭頭代表拆分子任務。綠色箭頭代表返 ...


Fork譯為拆分,Join譯為合併
Fork/Join框架的思路是把一個非常巨大的任務,拆分成若然的小任務,再由小任務繼續拆解。直至達到一個相對合理的任務粒度。然後執行獲得結果,然後將這些小任務的結果彙總,生成大任務的結果,
直至彙總成最初巨大任務的結果。如下圖:

紅色箭頭代表拆分子任務。
綠色箭頭代表返回子任務結果
這個框架的思路聽起來,其實用傳統的線程池、多線程完全就可以解決。但是內部卻有很多小的細節(後邊會說到),再加上清晰的使用思路,讓這個框架還是在多線程併發中,占有了一席之地。
Fork/Join框架下,我們常用到三個類:(防盜連接:本文首發自http://www.cnblogs.com/jilodream/ )
RecursiveAction,子任務類,支持子任務有返回結果任務
RecursiveTask,子任務類,用於有返回結果的任務
ForkJoinPool,執行子任務的線程池。
話不多說,我們直接看代碼:

 1 public class SumDemo extends RecursiveTask<Long> {
 2 
 3     int maxLen = 800_0000;
 4 
 5     int[] arr;
 6     int start;
 7     int end;
 8 
 9 
10     public SumDemo(int[] arr, int start, int end) {
11         this.arr = arr;
12         this.start = start;
13         this.end = end;
14     }
15 
16     @Override
17     protected Long compute() {
18         if (end - start < maxLen) {
19             long a = sum();
20             try {
21                 //Thread.sleep(1);
22             } catch (Exception e) {
23             }
24             return a;
25         }
26         int middle = (start + end) / 2;
27         SumDemo left = new SumDemo(arr, start, middle);
28         SumDemo right = new SumDemo(arr, middle + 1, end);
29         left.fork();
30         right.fork();
31         //invokeAll(left,right);
32         long leftRtn = left.join();
33         long rightRtn = right.join();
34         return leftRtn + rightRtn;
35     }
36 
37     private Long sum() {
38         System.out.println("now" + Thread.currentThread().getName() + "-start:" + start + "-end:" + end);
39         long sum = 0;
40         for (int i = start; i <= end; i++) {
41             sum += arr[i];
42         }
43         return sum;
44     }
45 
46     public static void main(String[] args) throws ExecutionException, InterruptedException {
47         int size = 30000_0000;
48         int[] arr = new int[size];
49         Random random = new Random(0);
50         for (int i = 0; i < size; i++) {
51             arr[i] = random.nextInt(10_0000_0000);
52         }
53         long cal = 0;
54         long start = System.currentTimeMillis();
55         for (int i = 0; i < size; i++) {
56             if (i % 800_0000 == 0) {
57                 Thread.sleep(1);
58             }
59             cal += arr[i];
60         }
61         long finish = System.currentTimeMillis();
62         long timeCost = finish - start;
63         System.out.println("cal" + cal);
64         long start1 = System.currentTimeMillis();
65         ForkJoinPool forkJoinPool = new ForkJoinPool();
66         ForkJoinTask<Long> result = forkJoinPool.submit(new
67                 SumDemo(arr, 0, size - 1));
68         long rtn = result.get();
69         long finish1 = System.currentTimeMillis();
70         long forkJoinCost = finish1 - start1;
71         System.out.println("one thread  cost" + (timeCost));
72         System.out.println("fork join cost" + forkJoinCost);
73     }
74 }

執行的結果大概是這樣的

 1 cal150000314007254036
 2 nowForkJoinPool-1-worker-1-start:0-end:4687499
 3 nowForkJoinPool-1-worker-3-start:187500000-end:192187499
 4 nowForkJoinPool-1-worker-5-start:37500000-end:42187499
 5 nowForkJoinPool-1-worker-6-start:225000000-end:229687499
 6 .....
 7 nowForkJoinPool-1-worker-3-start:220312500-end:224999999
 8 nowForkJoinPool-1-worker-7-start:267187500-end:271874999
 9 nowForkJoinPool-1-worker-2-start:107812500-end:112499999
10 nowForkJoinPool-1-worker-4-start:281250000-end:285937499
11 nowForkJoinPool-1-worker-7-start:271875000-end:276562499
12 nowForkJoinPool-1-worker-5-start:135937500-end:140624999
13 nowForkJoinPool-1-worker-11-start:140625000-end:145312499
14 nowForkJoinPool-1-worker-6-start:276562500-end:281249999
15 nowForkJoinPool-1-worker-4-start:285937500-end:290624999
16 nowForkJoinPool-1-worker-11-start:145312500-end:149999999
17 nowForkJoinPool-1-worker-7-start:290625000-end:295312499
18 nowForkJoinPool-1-worker-4-start:295312500-end:299999999
19 one thread cost136
20 fork join cost67

線程池預設大小是根據cpu當前的可用核數來作為大小的,我們這裡是12核,但是12核居然只比單一線程用時少50%,這是挺奇怪的,這主要是由於我們Demo中的任務是連續的計算密集型任務,這種情況下單一線程的表現也很優秀,forkJoin反而由於要不斷協調線程

任務而導致會損耗性能,所以差距並不明顯。倘若放開註釋中的睡眠時間,則兩者的差距會拉開的非常大,如下:

1 one thread  cost675
2 fork join cost194

代碼的思路大概是這樣的:

我們先定義一個子任務類,子任務類設置一個閾值,子任務開始任務時會判斷:
如果計算量未超過閾值呢,說明任務足夠小,我們當前子任務直接就執行計算了。
如果計算量超過閾值,說明任務比較大我們需要進行拆分,此時創建好拆分子任務,並使用fork()方法即可。拆分後的子任務,則後續使用join等待結果即可。
這樣通過Fork/Join框架實現大任務的計算就算是搞定了。(防盜連接:本文首發自http://www.cnblogs.com/jilodream/ )

那既然是線程池,是如何協調線程來計運算元任務的呢?

(1)與傳統線程池共用一個任務隊列不同的是,Fork/Join框架中,每個子任務都有一個屬於自己線程的任務隊列(但是兩者其實並不是一對一的關係,源碼很複雜),如下圖:

這樣肯定會由於任務規模、計算難度的不同,導致有些線程很快執行完了,其它線程還有很長的任務隊列,那怎麼辦呢?
Fork/Join框架會讓任務已經完成的線程,從其它任務的隊列的尾端去取任務,這樣一方面加速了任務的完成,一方面又減少了線程由於併發操作隊列可能存在的併發問題。
這種方式,我們也將它稱為“工作竊取”如下圖:

(2)Fork出來的子任務被誰執行了:
通過閱讀源碼我們可以發現,如果當前線程是線程池線程,則直接把fork出的子任務丟到當前線程的隊列中,否則會通過計算隨機的提交到其他的線程所擁有的的隊列中。由其他線程來完成。

1     public final ForkJoinTask<V> fork() {
2         Thread t;
3         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
4             ((ForkJoinWorkerThread)t).workQueue.push(this);
5         else
6             ForkJoinPool.common.externalPush(this);
7         return this;
8     }

 

如果你覺得寫的不錯,歡迎轉載和點贊。 轉載時請保留作者署名jilodream/王若伊_恩賜解脫(博客鏈接:http://www.cnblogs.com/jilodream/


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 某日二師兄參加XXX科技公司的C++工程師開發崗位第14面: > 面試官:在C++中,有哪些可執行體? > > 二師兄:可執行體? > > 面試官:也就是可調用對象。 > > 二師兄:讓我想一想。函數、函數指針、類的靜態方法、類的成員方法、仿函數、lambda表達式。 > > 面試官:能說一說他們之 ...
  • Set 介面是 Collection 介面的一個子介面。Set 介面的實現類不會包含重覆的元素,並且最多只能有一個 null 元素。當嘗試添加重覆元素時,添加操作將被忽略。Set 介面取出元素的順序和添加元素的順序不一致(但是每次取出的順序是固定的),即無法通過索引訪問 Set 中的元素。 ...
  • 函數是帶名字的代碼塊,用於完成具體的工作,無需反覆編寫完成該工作的代碼。之前我們接觸過print函數,數據類型轉換中的int函數、str函數,還有列表中的append函數、pop函數、remove函數,以及字典中的keys函數、values函數等等,其實在正式學習函數之前,我們已經接觸了函數,只不過 ...
  • 一、RocketMQ 核心的四大組件: Producer:就是消息生產者,可以集群部署。它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發送的 Topic 存在哪台 Broker Master上,然後再與其建立長連接,支持多種負載平衡模式發送消息。 Consumer:消息消費者 ...
  • 本文主要講解了一致性哈希演算法的原理以及其存在的數據傾斜的問題,然後引出解決數據傾斜問題的方法,最後分析一致性哈希演算法在Dubbo中的使用。通過這篇文章,可以瞭解到一致性哈希演算法的原理以及這種演算法存在的問題和解決方案。 ...
  • # Go 語言之 SQLX 高級操作 sqlx.In ## sqlx.In 介紹 `sqlx` is a package for Go which provides a set of extensions on top of the excellent built-in `database/sql` ...
  • # 簡介 Scala是一種多範式的編程語言(多範式:多種編程方法的意思。有面向過程、面向對象、泛型、函數式四種程式設計方法),其設計的初衷是要集成面向對象編程和函數式編程的各種特性。Scala運行於Java平臺(Java虛擬機),並相容現有的Java程式 > 官網:https://www.scala ...
  • 基礎使用 首先引入依賴 <!-- redis依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </depende ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...