Fork譯為拆分,Join譯為合併Fork/Join框架的思路是把一個非常巨大的任務,拆分成若然的小任務,再由小任務繼續拆解。直至達到一個相對合理的任務粒度。然後執行獲得結果,然後將這些小任務的結果彙總,生成大任務的結果,直至彙總成最初巨大任務的結果。如下圖: 紅色箭頭代表拆分子任務。綠色箭頭代表返 ...
Fork譯為拆分,Join譯為合併
Fork/Join框架的思路是把一個非常巨大的任務,拆分成若然的小任務,再由小任務繼續拆解。直至達到一個相對合理的任務粒度。然後執行獲得結果,然後將這些小任務的結果彙總,生成大任務的結果,
直至彙總成最初巨大任務的結果。如下圖:
紅色箭頭代表拆分子任務。
綠色箭頭代表返回子任務結果
這個框架的思路聽起來,其實用傳統的線程池、多線程完全就可以解決。但是內部卻有很多小的細節(後邊會說到),再加上清晰的使用思路,讓這個框架還是在多線程併發中,占有了一席之地。
Fork/Join框架下,我們常用到三個類:(防盜連接:本文首發自http://www.cnblogs.com/jilodream/ )
RecursiveAction,子任務類,支持子任務有返回結果任務
RecursiveTask,子任務類,用於有返回結果的任務
ForkJoinPool,執行子任務的線程池。
話不多說,我們直接看代碼:
1 public class SumDemo extends RecursiveTask<Long> { 2 3 int maxLen = 800_0000; 4 5 int[] arr; 6 int start; 7 int end; 8 9 10 public SumDemo(int[] arr, int start, int end) { 11 this.arr = arr; 12 this.start = start; 13 this.end = end; 14 } 15 16 @Override 17 protected Long compute() { 18 if (end - start < maxLen) { 19 long a = sum(); 20 try { 21 //Thread.sleep(1); 22 } catch (Exception e) { 23 } 24 return a; 25 } 26 int middle = (start + end) / 2; 27 SumDemo left = new SumDemo(arr, start, middle); 28 SumDemo right = new SumDemo(arr, middle + 1, end); 29 left.fork(); 30 right.fork(); 31 //invokeAll(left,right); 32 long leftRtn = left.join(); 33 long rightRtn = right.join(); 34 return leftRtn + rightRtn; 35 } 36 37 private Long sum() { 38 System.out.println("now" + Thread.currentThread().getName() + "-start:" + start + "-end:" + end); 39 long sum = 0; 40 for (int i = start; i <= end; i++) { 41 sum += arr[i]; 42 } 43 return sum; 44 } 45 46 public static void main(String[] args) throws ExecutionException, InterruptedException { 47 int size = 30000_0000; 48 int[] arr = new int[size]; 49 Random random = new Random(0); 50 for (int i = 0; i < size; i++) { 51 arr[i] = random.nextInt(10_0000_0000); 52 } 53 long cal = 0; 54 long start = System.currentTimeMillis(); 55 for (int i = 0; i < size; i++) { 56 if (i % 800_0000 == 0) { 57 Thread.sleep(1); 58 } 59 cal += arr[i]; 60 } 61 long finish = System.currentTimeMillis(); 62 long timeCost = finish - start; 63 System.out.println("cal" + cal); 64 long start1 = System.currentTimeMillis(); 65 ForkJoinPool forkJoinPool = new ForkJoinPool(); 66 ForkJoinTask<Long> result = forkJoinPool.submit(new 67 SumDemo(arr, 0, size - 1)); 68 long rtn = result.get(); 69 long finish1 = System.currentTimeMillis(); 70 long forkJoinCost = finish1 - start1; 71 System.out.println("one thread cost" + (timeCost)); 72 System.out.println("fork join cost" + forkJoinCost); 73 } 74 }
執行的結果大概是這樣的
1 cal150000314007254036 2 nowForkJoinPool-1-worker-1-start:0-end:4687499 3 nowForkJoinPool-1-worker-3-start:187500000-end:192187499 4 nowForkJoinPool-1-worker-5-start:37500000-end:42187499 5 nowForkJoinPool-1-worker-6-start:225000000-end:229687499 6 ..... 7 nowForkJoinPool-1-worker-3-start:220312500-end:224999999 8 nowForkJoinPool-1-worker-7-start:267187500-end:271874999 9 nowForkJoinPool-1-worker-2-start:107812500-end:112499999 10 nowForkJoinPool-1-worker-4-start:281250000-end:285937499 11 nowForkJoinPool-1-worker-7-start:271875000-end:276562499 12 nowForkJoinPool-1-worker-5-start:135937500-end:140624999 13 nowForkJoinPool-1-worker-11-start:140625000-end:145312499 14 nowForkJoinPool-1-worker-6-start:276562500-end:281249999 15 nowForkJoinPool-1-worker-4-start:285937500-end:290624999 16 nowForkJoinPool-1-worker-11-start:145312500-end:149999999 17 nowForkJoinPool-1-worker-7-start:290625000-end:295312499 18 nowForkJoinPool-1-worker-4-start:295312500-end:299999999 19 one thread cost136 20 fork join cost67
線程池預設大小是根據cpu當前的可用核數來作為大小的,我們這裡是12核,但是12核居然只比單一線程用時少50%,這是挺奇怪的,這主要是由於我們Demo中的任務是連續的計算密集型任務,這種情況下單一線程的表現也很優秀,forkJoin反而由於要不斷協調線程
任務而導致會損耗性能,所以差距並不明顯。倘若放開註釋中的睡眠時間,則兩者的差距會拉開的非常大,如下:
1 one thread cost675 2 fork join cost194
代碼的思路大概是這樣的:
我們先定義一個子任務類,子任務類設置一個閾值,子任務開始任務時會判斷:
如果計算量未超過閾值呢,說明任務足夠小,我們當前子任務直接就執行計算了。
如果計算量超過閾值,說明任務比較大我們需要進行拆分,此時創建好拆分子任務,並使用fork()方法即可。拆分後的子任務,則後續使用join等待結果即可。
這樣通過Fork/Join框架實現大任務的計算就算是搞定了。(防盜連接:本文首發自http://www.cnblogs.com/jilodream/ )
那既然是線程池,是如何協調線程來計運算元任務的呢?
(1)與傳統線程池共用一個任務隊列不同的是,Fork/Join框架中,每個子任務都有一個屬於自己線程的任務隊列(但是兩者其實並不是一對一的關係,源碼很複雜),如下圖:
這樣肯定會由於任務規模、計算難度的不同,導致有些線程很快執行完了,其它線程還有很長的任務隊列,那怎麼辦呢?
Fork/Join框架會讓任務已經完成的線程,從其它任務的隊列的尾端去取任務,這樣一方面加速了任務的完成,一方面又減少了線程由於併發操作隊列可能存在的併發問題。
這種方式,我們也將它稱為“工作竊取”如下圖:
(2)Fork出來的子任務被誰執行了:
通過閱讀源碼我們可以發現,如果當前線程是線程池線程,則直接把fork出的子任務丟到當前線程的隊列中,否則會通過計算隨機的提交到其他的線程所擁有的的隊列中。由其他線程來完成。
1 public final ForkJoinTask<V> fork() { 2 Thread t; 3 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 4 ((ForkJoinWorkerThread)t).workQueue.push(this); 5 else 6 ForkJoinPool.common.externalPush(this); 7 return this; 8 }
如果你覺得寫的不錯,歡迎轉載和點贊。 轉載時請保留作者署名jilodream/王若伊_恩賜解脫(博客鏈接:http://www.cnblogs.com/jilodream/