死磕 java線程系列之ForkJoinPool深入解析

来源:https://www.cnblogs.com/tong-yuan/archive/2019/11/09/11824018.html
-Advertisement-
Play Games

(手機橫屏看源碼更方便) 註:java源碼分析部分如無特殊說明均基於 java8 版本。 註:本文基於ForkJoinPool分治線程池類。 簡介 隨著在硬體上多核處理器的發展和廣泛使用,併發編程成為程式員必須掌握的一門技術,在面試中也經常考查面試者併發相關的知識。 今天,我們就來看一道面試題: 如 ...


forkjoinpool

(手機橫屏看源碼更方便)


註:java源碼分析部分如無特殊說明均基於 java8 版本。

註:本文基於ForkJoinPool分治線程池類。

簡介

隨著在硬體上多核處理器的發展和廣泛使用,併發編程成為程式員必須掌握的一門技術,在面試中也經常考查面試者併發相關的知識。

今天,我們就來看一道面試題:

如何充分利用多核CPU,計算很大數組中所有整數的和?

剖析

  • 單線程相加?

我們最容易想到就是單線程相加,一個for迴圈搞定。

  • 線程池相加?

如果進一步優化,我們會自然而然地想到使用線程池來分段相加,最後再把每個段的結果相加。

  • 其它?

Yes,就是我們今天的主角——ForkJoinPool,但是它要怎麼實現呢?似乎沒怎麼用過哈^^

三種實現

OK,剖析完了,我們直接來看三種實現,不墨跡,直接上菜。

/**
 * 計算1億個整數的和
 */
public class ForkJoinPoolTest01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 構造數據
        int length = 100000000;
        long[] arr = new long[length];
        for (int i = 0; i < length; i++) {
            arr[i] = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
        }
        // 單線程
        singleThreadSum(arr);
        // ThreadPoolExecutor線程池
        multiThreadSum(arr);
        // ForkJoinPool線程池
        forkJoinSum(arr);

    }

    private static void singleThreadSum(long[] arr) {
        long start = System.currentTimeMillis();

        long sum = 0;
        for (int i = 0; i < arr.length; i++) {
            // 模擬耗時,本文由公從號“彤哥讀源碼”原創
            sum += (arr[i]/3*3/3*3/3*3/3*3/3*3);
        }

        System.out.println("sum: " + sum);
        System.out.println("single thread elapse: " + (System.currentTimeMillis() - start));

    }

    private static void multiThreadSum(long[] arr) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();

        int count = 8;
        ExecutorService threadPool = Executors.newFixedThreadPool(count);
        List<Future<Long>> list = new ArrayList<>();
        for (int i = 0; i < count; i++) {
            int num = i;
            // 分段提交任務
            Future<Long> future = threadPool.submit(() -> {
                long sum = 0;
                for (int j = arr.length / count * num; j < (arr.length / count * (num + 1)); j++) {
                    try {
                        // 模擬耗時
                        sum += (arr[j]/3*3/3*3/3*3/3*3/3*3);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                return sum;
            });
            list.add(future);
        }

        // 每個段結果相加
        long sum = 0;
        for (Future<Long> future : list) {
            sum += future.get();
        }

        System.out.println("sum: " + sum);
        System.out.println("multi thread elapse: " + (System.currentTimeMillis() - start));
    }

    private static void forkJoinSum(long[] arr) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        // 提交任務
        ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(new SumTask(arr, 0, arr.length));
        // 獲取結果
        Long sum = forkJoinTask.get();

        forkJoinPool.shutdown();

        System.out.println("sum: " + sum);
        System.out.println("fork join elapse: " + (System.currentTimeMillis() - start));
    }

    private static class SumTask extends RecursiveTask<Long> {
        private long[] arr;
        private int from;
        private int to;

        public SumTask(long[] arr, int from, int to) {
            this.arr = arr;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 小於1000的時候直接相加,可靈活調整
            if (to - from <= 1000) {
                long sum = 0;
                for (int i = from; i < to; i++) {
                    // 模擬耗時
                    sum += (arr[i]/3*3/3*3/3*3/3*3/3*3);
                }
                return sum;
            }

            // 分成兩段任務,本文由公從號“彤哥讀源碼”原創
            int middle = (from + to) / 2;
            SumTask left = new SumTask(arr, from, middle);
            SumTask right = new SumTask(arr, middle, to);

            // 提交左邊的任務
            left.fork();
            // 右邊的任務直接利用當前線程計算,節約開銷
            Long rightResult = right.compute();
            // 等待左邊計算完畢
            Long leftResult = left.join();
            // 返回結果
            return leftResult + rightResult;
        }
    }
}

彤哥偷偷地告訴你,實際上計算1億個整數相加,單線程是最快的,我的電腦大概是100ms左右,使用線程池反而會變慢。

所以,為了演示ForkJoinPool的牛逼之處,我把每個數都/3*3/3*3/3*3/3*3/3*3了一頓操作,用來模擬計算耗時。

來看結果:

sum: 107352457433800662
single thread elapse: 789
sum: 107352457433800662
multi thread elapse: 228
sum: 107352457433800662
fork join elapse: 189

可以看到,ForkJoinPool相對普通線程池還是有很大提升的。

問題:普通線程池能否實現ForkJoinPool這種計算方式呢,即大任務拆中任務,中任務拆小任務,最後再彙總?

forkjoinpool

你可以試試看(-᷅_-᷄)

OK,下麵我們正式進入ForkJoinPool的解析。

分治法

  • 基本思想

把一個規模大的問題劃分為規模較小的子問題,然後分而治之,最後合併子問題的解得到原問題的解。

  • 步驟

(1)分割原問題:

(2)求解子問題:

(3)合併子問題的解為原問題的解。

在分治法中,子問題一般是相互獨立的,因此,經常通過遞歸調用演算法來求解子問題。

  • 典型應用場景

(1)二分搜索

(2)大整數乘法

(3)Strassen矩陣乘法

(4)棋盤覆蓋

(5)歸併排序

(6)快速排序

(7)線性時間選擇

(8)漢諾塔

ForkJoinPool繼承體系

ForkJoinPool是 java 7 中新增的線程池類,它的繼承體系如下:

forkjoinpool

ForkJoinPool和ThreadPoolExecutor都是繼承自AbstractExecutorService抽象類,所以它和ThreadPoolExecutor的使用幾乎沒有多少區別,除了任務變成了ForkJoinTask以外。

這裡又運用到了一種很重要的設計原則——開閉原則——對修改關閉,對擴展開放。

可見整個線程池體系一開始的介面設計就很好,新增一個線程池類,不會對原有的代碼造成干擾,還能利用原有的特性。

ForkJoinTask

兩個主要方法

  • fork()

fork()方法類似於線程的Thread.start()方法,但是它不是真的啟動一個線程,而是將任務放入到工作隊列中。

  • join()

join()方法類似於線程的Thread.join()方法,但是它不是簡單地阻塞線程,而是利用工作線程運行其它任務。當一個工作線程中調用了join()方法,它將處理其它任務,直到註意到目標子任務已經完成了。

三個子類

  • RecursiveAction

無返回值任務。

  • RecursiveTask

有返回值任務。

  • CountedCompleter

無返回值任務,完成任務後可以觸發回調。

ForkJoinPool內部原理

ForkJoinPool內部使用的是“工作竊取”演算法實現的。

forkjoinpool

(1)每個工作線程都有自己的工作隊列WorkQueue;

(2)這是一個雙端隊列,它是線程私有的;

(3)ForkJoinTask中fork的子任務,將放入運行該任務的工作線程的隊頭,工作線程將以LIFO的順序來處理工作隊列中的任務;

(4)為了最大化地利用CPU,空閑的線程將從其它線程的隊列中“竊取”任務來執行;

(5)從工作隊列的尾部竊取任務,以減少競爭;

(6)雙端隊列的操作:push()/pop()僅在其所有者工作線程中調用,poll()是由其它線程竊取任務時調用的;

(7)當只剩下最後一個任務時,還是會存在競爭,是通過CAS來實現的;

forkjoinpool

ForkJoinPool最佳實踐

(1)最適合的是計算密集型任務,本文由公從號“彤哥讀源碼”原創;

(2)在需要阻塞工作線程時,可以使用ManagedBlocker;

(3)不應該在RecursiveTask的內部使用ForkJoinPool.invoke()/invokeAll();

總結

(1)ForkJoinPool特別適合於“分而治之”演算法的實現;

(2)ForkJoinPool和ThreadPoolExecutor是互補的,不是誰替代誰的關係,二者適用的場景不同;

(3)ForkJoinTask有兩個核心方法——fork()和join(),有三個重要子類——RecursiveAction、RecursiveTask和CountedCompleter;

(4)ForkjoinPool內部基於“工作竊取”演算法實現;

(5)每個線程有自己的工作隊列,它是一個雙端隊列,自己從隊列頭存取任務,其它線程從尾部竊取任務;

(6)ForkJoinPool最適合於計算密集型任務,但也可以使用ManagedBlocker以便用於阻塞型任務;

(7)RecursiveTask內部可以少調用一次fork(),利用當前線程處理,這是一種技巧;

彩蛋

ManagedBlocker怎麼使用?

答:ManagedBlocker相當於明確告訴ForkJoinPool框架要阻塞了,ForkJoinPool就會啟另一個線程來運行任務,以最大化地利用CPU。

請看下麵的例子,自己琢磨哈^^。

/**
 * 斐波那契數列
 * 一個數是它前面兩個數之和
 * 1,1,2,3,5,8,13,21
 */
public class Fibonacci {

    public static void main(String[] args) {
        long time = System.currentTimeMillis();
        Fibonacci fib = new Fibonacci();
        int result = fib.f(1_000).bitCount();
        time = System.currentTimeMillis() - time;
        System.out.println("result,本文由公從號“彤哥讀源碼”原創 = " + result);
        System.out.println("test1_000() time = " + time);
    }

    public BigInteger f(int n) {
        Map<Integer, BigInteger> cache = new ConcurrentHashMap<>();
        cache.put(0, BigInteger.ZERO);
        cache.put(1, BigInteger.ONE);
        return f(n, cache);
    }

    private final BigInteger RESERVED = BigInteger.valueOf(-1000);

    public BigInteger f(int n, Map<Integer, BigInteger> cache) {
        BigInteger result = cache.putIfAbsent(n, RESERVED);
        if (result == null) {

            int half = (n + 1) / 2;

            RecursiveTask<BigInteger> f0_task = new RecursiveTask<BigInteger>() {
                @Override
                protected BigInteger compute() {
                    return f(half - 1, cache);
                }
            };
            f0_task.fork();

            BigInteger f1 = f(half, cache);
            BigInteger f0 = f0_task.join();

            long time = n > 10_000 ? System.currentTimeMillis() : 0;
            try {

                if (n % 2 == 1) {
                    result = f0.multiply(f0).add(f1.multiply(f1));
                } else {
                    result = f0.shiftLeft(1).add(f1).multiply(f1);
                }
                synchronized (RESERVED) {
                    cache.put(n, result);
                    RESERVED.notifyAll();
                }
            } finally {
                time = n > 10_000 ? System.currentTimeMillis() - time : 0;
                if (time > 50)
                    System.out.printf("f(%d) took %d%n", n, time);
            }
        } else if (result == RESERVED) {
            try {
                ReservedFibonacciBlocker blocker = new ReservedFibonacciBlocker(n, cache);
                ForkJoinPool.managedBlock(blocker);
                result = blocker.result;
            } catch (InterruptedException e) {
                throw new CancellationException("interrupted");
            }

        }
        return result;
        // return f(n - 1).add(f(n - 2));
    }

    private class ReservedFibonacciBlocker implements ForkJoinPool.ManagedBlocker {
        private BigInteger result;
        private final int n;
        private final Map<Integer, BigInteger> cache;

        public ReservedFibonacciBlocker(int n, Map<Integer, BigInteger> cache) {
            this.n = n;
            this.cache = cache;
        }

        @Override
        public boolean block() throws InterruptedException {
            synchronized (RESERVED) {
                while (!isReleasable()) {
                    RESERVED.wait();
                }
            }
            return true;
        }

        @Override
        public boolean isReleasable() {
            return (result = cache.get(n)) != RESERVED;
        }
    }
}

歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • var myDate = new Date();Date.now(); // 獲取當前時間 1573200670754 時間戳精確到毫秒 myDate.getYear(); //獲取當前年份(2位) myDate.getFullYear(); //獲取完整的年份(4位,2017) myDate.ge ...
  • 本節說一下DOM操作模塊里的刪除元素模塊,該模塊用於刪除DOM里的某個節點,也可以理解為將該節點從DOM樹中卸載掉,如果該節點有綁定事件,我們可以選擇保留或刪除這些事件,刪除元素的介面有如下三個: empty() ;移除匹配元素的所有子元素。 ;先移除所有後代元素關聯的數據和事件,以避免記憶體泄漏。然 ...
  • CSS的引入 在早期,如果要去定義一個H1的標題的顏色、字體、大小和其他的顯示特征,就需要用到HTML中的font或其他樣式的指令,H1只是一個結構指令所以光有它是不夠的。因此如果有多個標簽要去進行處理,就會造成樣式的重覆,後期維護的困難。 那CSS的出現就解決了這一類的問題,CSS(Cascadi ...
  • GoF設計模式一共有23個。一般可以按目的和作用範圍來進行劃分,具體劃分方法如下: 第一,這些模式按目的(即完成什麼樣任務)來劃分為創建型、結構型和行為型這三種模式: 創建型:用來創建對象。單例、原型、抽象工廠、建造者、工廠方法這五個都屬於這一分類。這種類別起到了將對象的創建與其使用進行分離解耦。 ...
  • 重構改善既有代碼 第一次做某件事情的時候儘管去做,第二次做類似的事會產生反感,第三次再做類似的事,你就應該重構。 小型函數優美動人 一個類最好是常量類,任何的改變都是調用該類本身的介面實現。 0 壞代碼的味道 1、重覆代碼 Duplicated Code 同一類中的兩個函數含有相同的表達式,提取到方 ...
  • 0 簡單工廠模式 0.0 簡單工廠模式動機 考慮一個簡單的軟體應用場景,一個軟體系統可提供多個外觀不同按鈕(如圓形、矩形按、菱形按鈕等), 這些按鈕都源自同一個父類,不過在繼承父類後不同的子類修改了部分屬性從而使得它們可呈現不同外觀,如果希望在使用這些按鈕時,不需要知道這些具體按鈕類的名字,只需要知 ...
  • 要想理解持續集成和持續部署,先要瞭解它的部分組成,以及各個組成部分之間的關係。下麵這張圖是我見過的最簡潔、清晰的持續部署和集成的關係圖。 "圖片來源" 持續部署: 如圖所示,開發的流程是這樣的: 程式員從源碼庫(Source Control)中下載源代碼,編寫程式,完成後提交代碼到源碼庫,持續集成( ...
  • 本解決方案是一個Windows應用編程框架和UI庫,包括四個項目: Ligg.EasyWinForm是一個Winform應用編程框架和UI庫。通過這個該框架,不需任何代碼,通過XML配置文件,搭建任意複雜的Windows應用界面,以類似Execel公式的方式實現基本的過程式控制制(賦值、條件判斷、迴圈、 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...