學習Java 8 Stream Api (5) - Stream 周邊及其他

来源:https://www.cnblogs.com/todev/archive/2020/06/13/13121280.html
-Advertisement-
Play Games

經過前面 4 篇內容的學習,我們已經掌握了 Stream 大部分的知識,本節我們針對之前 Stream 未涉及的內容及周邊知識點做個補充。 ...


Stream API

經過前面 4 篇內容的學習,我們已經掌握了 Stream 大部分的知識,本節我們針對之前 Stream 未涉及的內容及周邊知識點做個補充。

Fork/Join 框架

fork/join 框架是 Java 7 中引入的新特性 ,它是一個工具,通過 「 分而治之 」 的方法嘗試將所有可用的處理器內核使用起來幫助加速並行處理。

在實際使用過程中,這種 「 分而治之 」的方法意味著框架首先要 fork ,遞歸地將任務分解為較小的獨立子任務,直到它們足夠簡單以便非同步執行。然後,join 部分開始工作,將所有子任務的結果遞歸地連接成單個結果,或者在返回 void 的任務的情況下,程式只是等待每個子任務執行完畢。

為了提供有效的並行執行,fork/join 框架使用了一個名為 ForkJoinPool 的線程池,用於管理 ForkJoinWorkerThread 類型的工作線程。

Fork/Join 優點

Fork/Join 架構使用了一種名為工作竊取( work-stealing )演算法來平衡線程的工作負載。

簡單來說,工作竊取演算法就是空閑的線程試圖從繁忙線程的隊列中竊取工作。

預設情況下,每個工作線程從其自己的雙端隊列中獲取任務。但如果自己的雙端隊列中的任務已經執行完畢,雙端隊列為空時,工作線程就會從另一個忙線程的雙端隊列尾部或全局入口隊列中獲取任務,因為這是最大概率可能找到工作的地方。

這種方法最大限度地減少了線程競爭任務的可能性。它還減少了工作線程尋找任務的次數,因為它首先在最大可用的工作塊上工作。

Fork/Join 使用

ForkJoinTask 是 ForkJoinPool 線程之中執行的任務的基本類型。我們日常使用時,一般不直接使用 ForkJoinTask ,而是擴展它的兩個子類中的任意一個

  1. 任務不返回結果 ( 返回 void ) 的 RecursiveAction
  2. 返回值的任務的 RecursiveTask

這兩個類都有一個抽象方法 compute() ,用於定義任務的邏輯。

我們所要做的,就是繼承任意一個類,然後實現 compute() 方法,步驟如下:

  1. 創建一個表示工作總量的對象
  2. 選擇合適的閾值
  3. 定義分割工作的方法
  4. 定義執行工作的方法

如下是使用 Fork/Join 方式實現的1至1000006587的 Fork/Join 方式累加,我們和單線程的迴圈累加做了下對比,在 Intel i5-4460 的 PC 機器下,單線程執行使用了 650 ms,使用了 Fork/Join 方式執行 210 ms,優化效果挺明顯。


public class NumberAddTask extends RecursiveTask<Long> {

    private static final int THRESHOLD = 10_0000;
    private final int begin;
    private final int end;

    public NumberAddTask(int begin, int end) {
        super();
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - begin <= THRESHOLD) {
            long sum = 0;
            for(int i = begin; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
        int mid = (begin + end) /2;
        NumberAddTask t1 = new NumberAddTask(begin, mid);
        NumberAddTask t2 = new NumberAddTask(mid + 1,  end);
        ForkJoinTask.invokeAll(t1, t2);
        return t1.join() + t2.join();
    }
}

// 1至1000006587的Fork/Join方式累加
@Test
public void testAddForkJoin() {
    long begin = System.currentTimeMillis();
    int n = 10_0000_6587;
    ForkJoinPool pool = ForkJoinPool.commonPool();
    log.info("1 + 2 + ... {} = {}", n, pool.invoke(new NumberAddTask(1, n)));
    long end = System.currentTimeMillis();
    log.info("ForkJoin方式執行時間:{}ms", end - begin);
}

// 1至1000006587的單線程累加
@Test
public void testAddFunction() {
    long begin = System.currentTimeMillis();
    int n = 10_0000_6587;
    long sum = 0;
    for(int i = 1; i <= n; i++ ) {
        sum += i;
    }
    log.info("1 + 2 + ... {} = {}", n, sum);
    long end = System.currentTimeMillis();
    log.info("函數方式執行時間:{}ms", end - begin);
}

Fork/Join 使用場景

我使用 Java 8 官方 Api 中 RecursiveTask 的示例,創建了一個計算斐波那契數列的 Fork/Join 實現,雖然官方也提到了這是愚蠢的實現斐波那契數列方法,甚至效果還不如單線程的遞歸計算,但是這也說明瞭 Fork/Join 並非萬能的。

@Test
public void testForkJoin() {
    // 執行f(40) = 102334155使用3411ms
    // 執行f(80) 2個多小時,無法計算出結果
    long begin = System.currentTimeMillis();
    int n = 40;
    ForkJoinPool pool = ForkJoinPool.commonPool();
    log.info("ForkJoinPool初始化時間:{}ms", System.currentTimeMillis() - begin);
    log.info("斐波那契數列f({}) = {}", n, pool.invoke(new FibonacciTask(n)));
    long end = System.currentTimeMillis();
    log.info("ForkJoin方式執行時間:{}ms", end - begin);
}

// 不用遞歸計算斐波那契數列反而更快
@Test
public void testFibonacci() {
    // 執行f(50000) 使用 110ms
    // 輸出 f(50000) = 17438開頭的10450位長的整數
    long begin = System.currentTimeMillis();
    int n = 50000;
    log.info("斐波那契數列f({}) = {}", n, FibonacciUtil.fibonacci(n));
    long end = System.currentTimeMillis();
    log.info("函數方式執行時間:{}ms", end - begin);
}

以上代碼見 StreamOtherTest 。

Fork/Join 最大的優點是提供了工作竊取演算法,可以在多核CPU處理器上加速並行處理,他並非多線程開發替代品。

那麼他們之間有什麼區別呢?

Fork/Join框架是從jdk7中新特性,它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限隊列來保存需要執行的任務,而線程的數量則是通過構造函數傳入,如果沒有向構造函數中傳入希望的線程數量,那麼當前電腦可用的CPU數量會被設置為線程數量作為預設值。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序演算法。這裡的要點在於,ForkJoinPool需要使用相對少的線程來處理大量的任務。比如要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合併任務。以此類推,對於500萬的數據也會做出同樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。

所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法像任務隊列中再添加一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的線程創建新的任務,並掛起當前的任務,此時線程就能夠從隊列中選擇子任務執行。

那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼差異呢?

首先,使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有父子關係的任務,比如使用4個線程來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關係的任務時,也需要200萬個線程,顯然這是不可行的。

在實踐中,ThreadPoolExecutor通常用於同時(並行)處理許多獨立請求(又稱為事務),Fork/Join通常用於加速一項連貫的工作任務。

parallelStream 並行化

parallelStream 其實就是一個並行執行的流.它通過預設的 ForkJoinPool ,可以提高你的多線程任務的速度。parallelStream 具有並行處理能力,處理的過程會分而治之,也就是將一個大任務切分成多個小任務,這表示每個任務都是一個操作,可以並行處理。

parallelStream 的使用

使用方式:

  1. 創建時返回並行流:如 Collection.parallelStream()
  2. 過程中轉換為並行流:如 Stream.parallel()
  3. 如果需要,轉換為順序流:Stream.sequential()
// 並行流時,並非按照1,2,3...500的順序輸出
IntStream.range(1, 500).parallel().forEach(System.out::println);

parallelStream 的陷阱

由於 parallelStream 使用的是 ForkJoinPool 中的 commonPool,該方法預設創建程式運行時所在電腦處理器內核數量的線程,當同時存在多個工作並行執行時,ForkJoinPool 中的線程將被消耗完,而當有的worker因為執行耗時操作,將導致其他工作也被阻塞,而此時我們也不清楚哪個任務導致了阻塞。這就是 parallelStream 的陷阱。

parallelStream 是無法預測的,而且想要正確地使用它有些棘手。幾乎任何 parallelStream 的使用都會影響程式中其他部分的性能,而且是一種無法預測的方式。但是在調用stream.parallel() 或者 parallelStream() 時候在我的代碼里之前我仍然會重新審視一遍他給我的程式究竟會帶來什麼問題,他能有多大的提升,是否有使用他的意義。

那麼到底是使用 stream 還是 parallelStream 呢?通過下麵3個標準來鑒定

1. 是否需要並行?

在回答這個問題之前,你需要弄清楚你要解決的問題是什麼,數據量有多大,計算的特點是什麼?並不是所有的問題都適合使用併發程式來求解,比如當數據量不大時,順序執行往往比並行執行更快。畢竟,準備線程池和其它相關資源也是需要時間的。但是,當任務涉及到I/O操作並且任務之間不互相依賴時,那麼並行化就是一個不錯的選擇。通常而言,將這類程式並行化之後,執行速度會提升好幾個等級。

2. 任務之間是否是獨立的?是否會引起任何競態條件?

如果任務之間是獨立的,並且代碼中不涉及到對同一個對象的某個狀態或者某個變數的更新操作,那麼就表明代碼是可以被並行化的。

3. 結果是否取決於任務的調用順序?

由於在並行環境中任務的執行順序是不確定的,因此對於依賴於順序的任務而言,並行化也許不能給出正確的結果。

創建流的其他方式

我們在第1篇中記錄了幾種創建流的方式,但還是遺漏了一部分,再此稍作補充。

從I/O通道

方式1:從緩存流中讀取為Stream,詳見如下代碼:

final String name = "明玉";
// 從網路上讀取文字內容
new BufferedReader(
        new InputStreamReader(
                new URL("https://www.txtxzz.com/txt/download/NWJhZjI3YjIzYWQ3N2UwMTZiNDQwYWE3")
                // new URL("https://api.apiopen.top/getAllUrl")
                        .openStream()))
        .lines()
        .filter(str -> StrUtil.contains(str, name))
        .forEach(System.out::println);

方式2:從文件系統獲取下級路徑及文件,詳見如下代碼:

// 獲取文件系統的下級路徑及其文件
Files.walk(FileSystems.getDefault().getPath("D:\\soft"))
        .forEach(System.out::println);

方式3:從文件系統獲取文件內容,詳見如下代碼:

Files.lines(FileSystems.getDefault().getPath("D:\\", "a.txt"))
    // .parallel()
    .limit(200)
    .forEach(System.out::println);

方式4:讀取JarFile內的文件,詳見如下代碼:

new JarFile("D:\\J2EE_Tools\\repository\\org\\springframework\\spring-core\\5.2.6.RELEASE\\spring-core-5.2.6.RELEASE.jar")
        .stream()
        .filter(entry -> StrUtil.contains(entry.getName(), "Method"))
        .forEach(System.out::println);

獲取隨機數字流

使用類Random的ints、longs、doubles的方法,根據傳遞不同的參數,可以產生無限數字流、有限數字流、以及指定範圍的有限或無限數字流,示例如下:

double v = new Random()
        .doubles(30, 2, 45)
        .peek(System.out::println)
        .max()
        .getAsDouble();
log.info("一串隨機數的最大值為:{}", v);

位向量流

將BitSet中位向量為真的轉換為Stream,示例如下:

BitSet bitSet = new BitSet(8);
bitSet.set(1);
bitSet.set(6);
log.info("cardinality值{}", bitSet.cardinality());
bitSet.stream().forEach(System.out::println);

正則分割流

將字元串按照正則表達式分隔成子串流,示例如下:

Pattern.compile(":")
        .splitAsStream("boo:and:foo")
        .map(String::toUpperCase)
        .forEach(System.out::println);

Stream 的其他方法

轉為無序流

使用 unordered() 方法可將 Stream 隨時轉為無序流。

轉換為Spliterator

使用 spliterator() 方法可將 Stream 轉為 Spliterator,Spliterator 介紹請看 https://juejin.im/post/5cf2622de51d4550bf1ae7ff

綜合示例

根據1962年第1屆百花獎至2018年第34屆百花獎數據,有以下數據,編寫代碼按照獲得最佳男主角的演員次數排名,次數相同的按照參演年份正序排,並列印他所參演的電影。

序號 最佳男主角 電影
第1屆1962年 崔嵬 《紅旗譜》
第2屆1963年 張良 《哥倆好
第3屆1980年 李仁堂 《淚痕》
第4屆1981年 達式常 《燕歸來》
第5屆1982年 王心剛 《知音》
第6屆1983年 嚴順開 《阿Q正傳》
第7屆1984年 楊在葆 《血,總是熱的》
第8屆1985年 呂曉禾 《高山下的花環》
第9屆1986年 楊在葆 《代理市長》
第10屆1987年 薑文 《芙蓉鎮》
第11屆1988年 張藝謀 《老井》
第12屆1989年 薑文 《春桃》
第13屆1990年 古月 《開國大典》
第14屆1991年 李雪健 《焦裕祿》
第15屆1992年 王鐵成 《周恩來》
第16屆1993年 古月 《毛ze東的故事》
第17屆1994年 李保田 《鳳凰琴》
第18屆1995年 李仁堂 《被告山杠爺》
第19屆1996年 張國立 《混在北京》
第20屆1997年 高明 《孔繁森》
第21屆1998年 葛優 《甲方乙方》
第22屆1999年 趙本山 《男婦女主任》
第23屆2000年 潘長江 《明天我愛你》
第24屆2001年 王慶祥 《生死抉擇》
第25屆2002年 葛優 《大腕》
第26屆2003年 盧奇 《鄧小平》
第27屆2004年 葛優 《手機》
第27屆2004年 李幼斌 《驚心動魂》
第28屆2006年 吳軍 《張思德》
第29屆2008年 張涵予 《集結號》
第30屆2010年 陳坤 《畫皮》
第31屆2012年 文章 《失戀33天》
第32屆2014年 黃曉明 《中國合伙人》
第33屆2016年 馮紹峰 《狼圖騰》
第34屆2018年 吳京 《戰狼2》

根據題目要求,創建 HundredFlowersAwards 實體用來存儲上述數據,我們分析題目要求最終需要轉換為以演員為主的信息,然後再根據演員的獲獎次數及出演年份做排序。
所以創建 ActorInfo 實體,包含 演員姓名和出演電影的信息。出演電影也需創建實體 FilmInfo ,包含 出演年份和電影名稱。

有瞭如上存儲數據實體信息後,代碼實現邏輯如下:

  1. 將百花獎的集合數據轉換為 Stream
  2. 將該數據流轉換為Map類型,Map 的 key 為演員名,Map 的 Value 為演員信息
  3. 對於重覆出現的演員,我們需要把電影信息追加到該演員出現的電影列表中
  4. 對於處理完的 Map 數據,將該 Map 的 values 數據再次轉換為 Stream
  5. 將該 Stream 排序即可。
list.stream()
    .collect(Collectors.toMap(HundredFlowersAwards::getActorName, ActorInfo::new, ActorInfo::addFilmInfos))
    .values()
    .stream()
    .sorted(new ActorComparator())
    .forEach(System.out::println);

本節代碼見 StreamOtherTest 。

經過幾天的學習和總結,以上就是 Java Stream Api 的全部內容了。從開始認識 Stream Api,我們逐漸瞭解了使用 Stream Api 的流程:創建 Stream 、中間操作、終端操作。
我們對創建 Stream 、中間操作、終端操作的各個 api 方法進行了介紹及案例演示,之後我們還單獨抽出一節講解了 Collector 介面的實現及使用。
上述內容雖然文字不多,大部分都在代碼中給出了演示,希望大家能下載下來代碼並運行,以加深印象。

以上是前傳部分的學習內容了,接下來我們將進入到 Reactor 部分的學習。

源碼下載:https://github.com/crystalxmumu/spring-web-flux-study-note

參考

  1. 【Java8新特性】關於Java8的Stream API,看這一篇就夠了{:target="_blank"}
  2. 一文秒懂 Java Fork/Join{:target="_blank"}
  3. 深入淺出parallelStream{:target="_blank"}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。 生活中我們會拍很多的證件照,有的要求紅底,有的是白底,有的是藍底,今天不通過摳圖,實現一鍵換底片!想換什麼換什麼 知識點: 1.圖像處理 2.OpenCV 3.numpy 4 ...
  • Windows下C,C++開發環境搭建指南 前情提要 基於近一段時間很多網友發郵件反饋,說一些項目編譯出現問題,諸如此類的情況。 就覺得很有必要寫一篇C,C++開發環境的小指南,統一回覆。 1.君欲善其事必先利其器 1.1.輔助開發利器推薦 作為一個老碼農,有一些個人長期使用的輔助工具,分享給大家, ...
  • 疫情原因,我也不得不走上了面試之路,先是在網上收集了各種面試資料,再是閉關啃題看源碼。這一路走來的辛酸,在拿到offer的那一刻讓我覺得是值得的。為了讓大家多吸取一些經驗能順利的進大廠,順便把我自己的一些能用上的資料分享給大家,希望對大家有所幫助,早日進入心儀的大廠!年薪百萬! ...
  • 在JDBC中使用預編譯PreparedStatement 以及它的優點 步驟 1 : 使用PreparedStatement 和 Statement一樣,PreparedStatement也是用來執行sql語句的 與創建Statement不同的是,需要根據sql語句創建PreparedStateme ...
  • SQL--事務 博客說明 文章所涉及的資料來自互聯網整理和個人總結,意在於個人學習和經驗彙總,如有什麼地方侵權,請聯繫本人刪除,謝謝! 概念 如果一個包含多個步驟的業務操作,被事務管理,那麼這些操作要麼同時成功,要麼同時失敗 操作 1. 開啟事務: start transaction; 2. 回滾: ...
  • 不知從何時起,Python和爬蟲就如初戀一般,情不知所起,一往而深,相信很多朋友學習Python,都是從爬蟲開始,其實究其原因,不外兩方面:其一Python對爬蟲的支持度比較好,類庫眾多。其二Pyhton的語法簡單,入門容易。所以兩者形影相隨,不離不棄,本文主要以一個簡單的小例子,簡述Python在... ...
  • Synchronized關鍵字可以用來修飾方法或者代碼塊。對於同步方法,JVM 採用 ACC_SYNCHRONIZED 標記符來實現同步。 對於同步代碼塊。JVM 採用 monitorenter、monitorexit 兩個指令來實現同步。 在JDK1.6之後對對synchronized鎖進行了升級 ...
  • SQL--多表查詢(mysql) 博客說明 文章所涉及的資料來自互聯網整理和個人總結,意在於個人學習和經驗彙總,如有什麼地方侵權,請聯繫本人刪除,謝謝! 笛卡爾積 有兩個集合A,B .取這兩個集合的所有組成情況。 要完成多表查詢,需要消除無用的數據 分類 內連接查詢 1. 從哪些表中查詢數據 2. ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...