摘要:Fork/Join框架位於J.U.C(java.util.concurrent)中,是Java7中提供的用於執行並行任務的框架,其可以將大任務分割成若幹個小任務,最終彙總每個小任務的結果後得到最終結果。 本文分享自華為雲社區《如何使用Java7提供的Fork/Join框架實現高併發程式?》,作 ...
摘要:Fork/Join框架位於J.U.C(java.util.concurrent)中,是Java7中提供的用於執行並行任務的框架,其可以將大任務分割成若幹個小任務,最終彙總每個小任務的結果後得到最終結果。
本文分享自華為雲社區《如何使用Java7提供的Fork/Join框架實現高併發程式?》,作者:冰 河。
Fork/Join框架位於J.U.C(java.util.concurrent)中,是Java7中提供的用於執行並行任務的框架,其可以將大任務分割成若幹個小任務,最終彙總每個小任務的結果後得到最終結果。基本思想和Hadoop的MapReduce思想類似。
主要採用的是工作竊取演算法(某個線程從其他隊列里竊取任務來執行),並行分治計算中的一種Work-stealing策略
為什麼需要使用工作竊取演算法呢?
假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競爭,於是把這些子任務分別放到不同的隊列里,併為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務幹完,而其他線程對應的隊列里還有任務等待處理。幹完活的線程與其等著,不如去幫其他線程幹活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
工作竊取演算法的優點
充分利用線程進行並行計算,並減少了線程間的競爭
工作竊取演算法的缺點
在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。並且該演算法會消耗更多的系統資源,比如創建多個線程和多個雙端隊列。
Fork/Join框架局限性
對於Fork/Join框架而言,當一個任務正在等待它使用Join操作創建的子任務結束時,執行這個任務的工作線程查找其他未被執行的任務,並開始執行這些未被執行的任務,通過這種方式,線程充分利用它們的運行時間來提高應用程式的性能。為了實現這個目標,Fork/Join框架執行的任務有一些局限性,如下所示。
- 任務只能使用Fork和Join操作來進行同步機制,如果使用了其他同步機制,則在同步操作時,工作線程就不能執行其他任務了。比如,在Fork/Join框架中,使任務進行了睡眠,那麼,在睡眠期間內,正在執行這個任務的工作線程將不會執行其他任務了。
- 在Fork/Join框架中,所拆分的任務不應該去執行IO操作,比如:讀寫數據文件
- 任務不能拋出檢查異常,必須通過必要的代碼來出來這些異常
Fork/Join框架的核心類
Fork/Join框架的核心是兩個類:ForkJoinPool和ForkJoinTask。ForkJoinPool負責實現工作竊取演算法、管理工作線程、提供關於任務的狀態以及執行信息。ForkJoinTask主要提供在任務中執行Fork和Join操作的機制。
示例代碼
示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; @Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果任務足夠小就計算任務 boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任務大於閾值,就分裂成兩個子任務計算 int middle = (start + end) / 2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); // 執行子任務 leftTask.fork(); rightTask.fork(); // 等待任務執行結束合併其結果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合併子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一個計算任務,計算1+2+3+4 ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); //執行一個任務 Future<Integer> result = forkjoinPool.submit(task); try { log.info("result:{}", result.get()); } catch (Exception e) { log.error("exception", e); } } }