FutureTask、Fork/Join、 BlockingQueue

来源:https://www.cnblogs.com/xiangkejin/archive/2018/07/06/9275312.html
-Advertisement-
Play Games

我們之前學習創建線程有Thread和Runnable兩種方式,但是兩種方式都無法獲得執行的結果。 而Callable和Future在任務完成後得到結果。 Future是一個介面,表示一個任務的周期,並提供了相應的方法來判斷是否已經完成或者取消任務,以及獲取任務的結果和取消任務。 FutureTask ...


我們之前學習創建線程有Thread和Runnable兩種方式,但是兩種方式都無法獲得執行的結果。 而Callable和Future在任務完成後得到結果。   Future是一個介面,表示一個任務的周期,並提供了相應的方法來判斷是否已經完成或者取消任務,以及獲取任務的結果和取消任務。   FutureTask可用於非同步獲取執行結果或取消執行任務的場景。通過傳入Runnable或者Callable的任務給FutureTask,直接調用其run方法或者放入線程池執行,之後可以在外部通過FutureTask的get方法非同步獲取執行結果,因此,FutureTask非常適合用於耗時的計算,主線程可以在完成自己的任務後,再去獲取結果。另外,FutureTask還可以確保即使調用了多次run方法,它都只會執行一次Runnable或者Callable任務,或者通過cancel取消FutureTask的執行等。   Future介面
import lombok.extern.slf4j.Slf4j;
 
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
@Slf4j
public class FutureExample {
 
    static class MyCallable implements Callable<String> {
 
        @Override
        public String call() throws Exception {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        }
    }
 
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do something in main");
        //Thread.sleep(10000);
        log.info("這裡不阻塞,可以繼續非同步執行");
        String result = future.get(); //get方法會發生阻塞,如果判斷任務是否執行完成使用isDone()方法
        log.info("result:{}", result);
    }
}

 FutureTask

import lombok.extern.slf4j.Slf4j;
 
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
 
@Slf4j
public class FutureTaskExample {
 
    public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do something in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });
 
        new Thread(futureTask).start();
        log.info("do something in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result:{}", result);
    }
}

Fork/Join

用於並行執行任務的框架,將大任務分割成小任務,最終將每個小任務的結果彙總得到大任務結果的框架。

思想和map/reduce非常像,Fork就是講大任務分割成小任務,Join就是合併子任務的結果。

工作竊取演算法

工作竊取(work-stealing)演算法是指某個線程從其他隊列里竊取任務來執行。工作竊取的運行流程圖如下:

fj

那麼為什麼需要使用工作竊取演算法呢?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競爭,於是把這些子任務分別放到不同的隊列里,併為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務幹完,而其他線程對應的隊列里還有任務等待處理。幹完活的線程與其等著,不如去幫其他線程幹活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工作竊取演算法的優點是充分利用線程進行並行計算,並減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。並且消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。

Fork/Join使用兩個類來完成以上兩件事情:

ForkJoinPool :它負責實現,包括我們的工作竊取演算法,它管理工作線程和任務狀態以及執行信息

ForkJoinTask:主要提供fork和join的機制

import lombok.extern.slf4j.Slf4j; 
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
 
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
 
    public static final int threshold = 2;
    private int start;
    private int end;
 
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected Integer compute() {
        int sum = 0;
 
        //如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務大於閾值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
 
            // 執行子任務
            leftTask.fork();
            rightTask.fork();
 
            // 等待任務執行結束合併其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
 
            // 合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
 
    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();
 
        //生成一個計算任務,計算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
 
        //執行一個任務
        Future<Integer> result = forkjoinPool.submit(task);
 
        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

 BlockingQueue

在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數據的問題。通過這些高效並且線程安全的隊列類,為我們快速搭建高質量的多線程程式帶來極大的便利。
認識BlockingQueue
阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在數據結構中所起的作用大致如下圖所示:

從上圖我們可以很清楚看到,通過一個共用的隊列,可以使得數據由隊列的一端輸入,從另外一端輸出;
常用的隊列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的一種)
  先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似於排隊的功能。從某種程度上來說這種隊列也體現了一種公平性。
  後進先出(LIFO):後插入隊列的元素最先出隊列,這種隊列優先處理最近發生的事件。

      多線程環境中,通過隊列可以很容易實現數據共用,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共用。假設我們有若幹生產者線程,另外又有若幹個消費者線程。如果生產者線程需要把準備好的數據共用給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共用問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大於消費者消費的速度,並且當生產出來的數據累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發佈以前,在多線程環境下,我們每個程式員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)
下麵兩幅圖演示了BlockingQueue的兩個常見阻塞場景:    如上圖所示:當隊列中沒有數據的情況下,消費者端的所有線程都會被自動阻塞(掛起),直到有數據放入隊列。


如上圖所示:當隊列中填滿數據的情況下,生產者端的所有線程都會被自動阻塞(掛起),直到隊列中有空的位置,線程被自動喚醒。

 

 

 1. ArrayBlockingQueue

基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存著兩個整形變數,分別標識著隊列的頭部和尾部在數組中的位置。
  ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味著兩者無法真正並行運行,這點尤其不同於LinkedBlockingQueue

 2. LinkedBlockingQueue

       基於鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發數據,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的併發性能。
作為開發者,我們需要註意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

 ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在python,我們有時候需要考慮到自己設計的演算法的複雜度,首先在python中提供了一個內置的模塊,可以用來檢驗我們演算法的複雜度,接下來來看下這個模塊timeit的使用,首先我們導入模塊from timeit import Timer,接下來我們需要將我們的蘇需要的計算的模塊進行封裝成函數,再用固... ...
  • 一、java對象初始化過程 第一步,載入該類,一個java對象在初始化前會進行類載入,在JVM中生成Class對象。載入一個類會進行如下操作,下麵給出遞歸描述。(關於Class對象詳見反射 點擊這裡) 如果該類有父類,則先載入其父類。 i 初始化該類靜態成員 ii 執行該類靜態代碼塊 第二步,創建對 ...
  • Python 設置環境 當安裝好Python 後 在電腦的屬性裡面 高級語言設置 環境變數、 環境變數裡面的path 更改為Python的 樹目錄 可以從電腦直接下達命令 打開Python 基本代碼 字元串 name="我是XX" name='woshixx' ...... 加法 n1="jj" ...
  • pillow介紹 一、Image類的屬性:1、Format 2、Mode 3、Size 4、Palette 5、Info 二、類的函數:1、New 2、Open 3、Blend 4、Composite 5、Eval 6、Frombuffer 7、Fromstring 8、Merge 三、Image類 ...
  • 效果: 我沒有弄文件夾保存,因為皮膚與英雄都是一一對應,這樣子更加方便操作。 點擊下載皮膚後,會自動從官網下載一個json文件,所以出了新英雄、新皮膚軟體會自動更新。高清圖: 但是有個別新皮膚官網也沒有提供數據,找不到新皮膚下載的選擇項時,點擊影藏皮膚獲取按鈕輸入英雄名字,再點擊隱藏皮膚下載即可。 ...
  • 五、異常 異常概念總結: 練習一:異常的體系 問題: 1. 請描述異常的繼承體系 2. 請描述你對錯誤(Error)的理解 3. 請描述你對異常(Expection的理解) 4. 請描述你對運行時異常(RuntimeException)的理解 答: 1. 異常繼承體係為:異常的根類是 java.la ...
  • 對於CPU密集型的程式,可以使用multiprocessing的Process,Pool等封裝好的類,通過多進程的方式實現並行計算。但是因為進程中的通信成本比較大,對於進程之間需要大量數據交互的程式效率未必有大的提高。 4、 針對迴圈的優化 每種編程語言都會強調需要優化迴圈。當使用Python的時候 ...
  • 2018-07-06 21:06:16 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...