經過前面 4 篇內容的學習,我們已經掌握了 Stream 大部分的知識,本節我們針對之前 Stream 未涉及的內容及周邊知識點做個補充。 ...
Stream API
經過前面 4 篇內容的學習,我們已經掌握了 Stream 大部分的知識,本節我們針對之前 Stream 未涉及的內容及周邊知識點做個補充。
Fork/Join 框架
fork/join 框架是 Java 7 中引入的新特性 ,它是一個工具,通過 「 分而治之 」 的方法嘗試將所有可用的處理器內核使用起來幫助加速並行處理。
在實際使用過程中,這種 「 分而治之 」的方法意味著框架首先要 fork ,遞歸地將任務分解為較小的獨立子任務,直到它們足夠簡單以便非同步執行。然後,join 部分開始工作,將所有子任務的結果遞歸地連接成單個結果,或者在返回 void 的任務的情況下,程式只是等待每個子任務執行完畢。
為了提供有效的並行執行,fork/join 框架使用了一個名為 ForkJoinPool 的線程池,用於管理 ForkJoinWorkerThread 類型的工作線程。
Fork/Join 優點
Fork/Join 架構使用了一種名為工作竊取( work-stealing )演算法來平衡線程的工作負載。
簡單來說,工作竊取演算法就是空閑的線程試圖從繁忙線程的隊列中竊取工作。
預設情況下,每個工作線程從其自己的雙端隊列中獲取任務。但如果自己的雙端隊列中的任務已經執行完畢,雙端隊列為空時,工作線程就會從另一個忙線程的雙端隊列尾部或全局入口隊列中獲取任務,因為這是最大概率可能找到工作的地方。
這種方法最大限度地減少了線程競爭任務的可能性。它還減少了工作線程尋找任務的次數,因為它首先在最大可用的工作塊上工作。
Fork/Join 使用
ForkJoinTask 是 ForkJoinPool 線程之中執行的任務的基本類型。我們日常使用時,一般不直接使用 ForkJoinTask ,而是擴展它的兩個子類中的任意一個
- 任務不返回結果 ( 返回 void ) 的 RecursiveAction
- 返回值的任務的 RecursiveTask
這兩個類都有一個抽象方法 compute() ,用於定義任務的邏輯。
我們所要做的,就是繼承任意一個類,然後實現 compute() 方法,步驟如下:
- 創建一個表示工作總量的對象
- 選擇合適的閾值
- 定義分割工作的方法
- 定義執行工作的方法
如下是使用 Fork/Join 方式實現的1至1000006587的 Fork/Join 方式累加,我們和單線程的迴圈累加做了下對比,在 Intel i5-4460 的 PC 機器下,單線程執行使用了 650 ms,使用了 Fork/Join 方式執行 210 ms,優化效果挺明顯。
public class NumberAddTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10_0000;
private final int begin;
private final int end;
public NumberAddTask(int begin, int end) {
super();
this.begin = begin;
this.end = end;
}
@Override
protected Long compute() {
if (end - begin <= THRESHOLD) {
long sum = 0;
for(int i = begin; i <= end; i++) {
sum += i;
}
return sum;
}
int mid = (begin + end) /2;
NumberAddTask t1 = new NumberAddTask(begin, mid);
NumberAddTask t2 = new NumberAddTask(mid + 1, end);
ForkJoinTask.invokeAll(t1, t2);
return t1.join() + t2.join();
}
}
// 1至1000006587的Fork/Join方式累加
@Test
public void testAddForkJoin() {
long begin = System.currentTimeMillis();
int n = 10_0000_6587;
ForkJoinPool pool = ForkJoinPool.commonPool();
log.info("1 + 2 + ... {} = {}", n, pool.invoke(new NumberAddTask(1, n)));
long end = System.currentTimeMillis();
log.info("ForkJoin方式執行時間:{}ms", end - begin);
}
// 1至1000006587的單線程累加
@Test
public void testAddFunction() {
long begin = System.currentTimeMillis();
int n = 10_0000_6587;
long sum = 0;
for(int i = 1; i <= n; i++ ) {
sum += i;
}
log.info("1 + 2 + ... {} = {}", n, sum);
long end = System.currentTimeMillis();
log.info("函數方式執行時間:{}ms", end - begin);
}
Fork/Join 使用場景
我使用 Java 8 官方 Api 中 RecursiveTask 的示例,創建了一個計算斐波那契數列的 Fork/Join 實現,雖然官方也提到了這是愚蠢的實現斐波那契數列方法,甚至效果還不如單線程的遞歸計算,但是這也說明瞭 Fork/Join 並非萬能的。
@Test
public void testForkJoin() {
// 執行f(40) = 102334155使用3411ms
// 執行f(80) 2個多小時,無法計算出結果
long begin = System.currentTimeMillis();
int n = 40;
ForkJoinPool pool = ForkJoinPool.commonPool();
log.info("ForkJoinPool初始化時間:{}ms", System.currentTimeMillis() - begin);
log.info("斐波那契數列f({}) = {}", n, pool.invoke(new FibonacciTask(n)));
long end = System.currentTimeMillis();
log.info("ForkJoin方式執行時間:{}ms", end - begin);
}
// 不用遞歸計算斐波那契數列反而更快
@Test
public void testFibonacci() {
// 執行f(50000) 使用 110ms
// 輸出 f(50000) = 17438開頭的10450位長的整數
long begin = System.currentTimeMillis();
int n = 50000;
log.info("斐波那契數列f({}) = {}", n, FibonacciUtil.fibonacci(n));
long end = System.currentTimeMillis();
log.info("函數方式執行時間:{}ms", end - begin);
}
以上代碼見 StreamOtherTest 。
Fork/Join 最大的優點是提供了工作竊取演算法,可以在多核CPU處理器上加速並行處理,他並非多線程開發替代品。
那麼他們之間有什麼區別呢?
Fork/Join框架是從jdk7中新特性,它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限隊列來保存需要執行的任務,而線程的數量則是通過構造函數傳入,如果沒有向構造函數中傳入希望的線程數量,那麼當前電腦可用的CPU數量會被設置為線程數量作為預設值。
ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序演算法。這裡的要點在於,ForkJoinPool需要使用相對少的線程來處理大量的任務。比如要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合併任務。以此類推,對於500萬的數據也會做出同樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。
所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法像任務隊列中再添加一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的線程創建新的任務,並掛起當前的任務,此時線程就能夠從隊列中選擇子任務執行。
那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼差異呢?
首先,使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有父子關係的任務,比如使用4個線程來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關係的任務時,也需要200萬個線程,顯然這是不可行的。
在實踐中,ThreadPoolExecutor通常用於同時(並行)處理許多獨立請求(又稱為事務),Fork/Join通常用於加速一項連貫的工作任務。
parallelStream 並行化
parallelStream 其實就是一個並行執行的流.它通過預設的 ForkJoinPool ,可以提高你的多線程任務的速度。parallelStream 具有並行處理能力,處理的過程會分而治之,也就是將一個大任務切分成多個小任務,這表示每個任務都是一個操作,可以並行處理。
parallelStream 的使用
使用方式:
- 創建時返回並行流:如 Collection
.parallelStream() - 過程中轉換為並行流:如 Stream
.parallel() - 如果需要,轉換為順序流:Stream
.sequential()
// 並行流時,並非按照1,2,3...500的順序輸出
IntStream.range(1, 500).parallel().forEach(System.out::println);
parallelStream 的陷阱
由於 parallelStream 使用的是 ForkJoinPool 中的 commonPool,該方法預設創建程式運行時所在電腦處理器內核數量的線程,當同時存在多個工作並行執行時,ForkJoinPool 中的線程將被消耗完,而當有的worker因為執行耗時操作,將導致其他工作也被阻塞,而此時我們也不清楚哪個任務導致了阻塞。這就是 parallelStream 的陷阱。
parallelStream 是無法預測的,而且想要正確地使用它有些棘手。幾乎任何 parallelStream 的使用都會影響程式中其他部分的性能,而且是一種無法預測的方式。但是在調用stream.parallel() 或者 parallelStream() 時候在我的代碼里之前我仍然會重新審視一遍他給我的程式究竟會帶來什麼問題,他能有多大的提升,是否有使用他的意義。
那麼到底是使用 stream 還是 parallelStream 呢?通過下麵3個標準來鑒定
1. 是否需要並行?
在回答這個問題之前,你需要弄清楚你要解決的問題是什麼,數據量有多大,計算的特點是什麼?並不是所有的問題都適合使用併發程式來求解,比如當數據量不大時,順序執行往往比並行執行更快。畢竟,準備線程池和其它相關資源也是需要時間的。但是,當任務涉及到I/O操作並且任務之間不互相依賴時,那麼並行化就是一個不錯的選擇。通常而言,將這類程式並行化之後,執行速度會提升好幾個等級。
2. 任務之間是否是獨立的?是否會引起任何競態條件?
如果任務之間是獨立的,並且代碼中不涉及到對同一個對象的某個狀態或者某個變數的更新操作,那麼就表明代碼是可以被並行化的。
3. 結果是否取決於任務的調用順序?
由於在並行環境中任務的執行順序是不確定的,因此對於依賴於順序的任務而言,並行化也許不能給出正確的結果。
創建流的其他方式
我們在第1篇中記錄了幾種創建流的方式,但還是遺漏了一部分,再此稍作補充。
從I/O通道
方式1:從緩存流中讀取為Stream,詳見如下代碼:
final String name = "明玉";
// 從網路上讀取文字內容
new BufferedReader(
new InputStreamReader(
new URL("https://www.txtxzz.com/txt/download/NWJhZjI3YjIzYWQ3N2UwMTZiNDQwYWE3")
// new URL("https://api.apiopen.top/getAllUrl")
.openStream()))
.lines()
.filter(str -> StrUtil.contains(str, name))
.forEach(System.out::println);
方式2:從文件系統獲取下級路徑及文件,詳見如下代碼:
// 獲取文件系統的下級路徑及其文件
Files.walk(FileSystems.getDefault().getPath("D:\\soft"))
.forEach(System.out::println);
方式3:從文件系統獲取文件內容,詳見如下代碼:
Files.lines(FileSystems.getDefault().getPath("D:\\", "a.txt"))
// .parallel()
.limit(200)
.forEach(System.out::println);
方式4:讀取JarFile內的文件,詳見如下代碼:
new JarFile("D:\\J2EE_Tools\\repository\\org\\springframework\\spring-core\\5.2.6.RELEASE\\spring-core-5.2.6.RELEASE.jar")
.stream()
.filter(entry -> StrUtil.contains(entry.getName(), "Method"))
.forEach(System.out::println);
獲取隨機數字流
使用類Random的ints、longs、doubles的方法,根據傳遞不同的參數,可以產生無限數字流、有限數字流、以及指定範圍的有限或無限數字流,示例如下:
double v = new Random()
.doubles(30, 2, 45)
.peek(System.out::println)
.max()
.getAsDouble();
log.info("一串隨機數的最大值為:{}", v);
位向量流
將BitSet中位向量為真的轉換為Stream,示例如下:
BitSet bitSet = new BitSet(8);
bitSet.set(1);
bitSet.set(6);
log.info("cardinality值{}", bitSet.cardinality());
bitSet.stream().forEach(System.out::println);
正則分割流
將字元串按照正則表達式分隔成子串流,示例如下:
Pattern.compile(":")
.splitAsStream("boo:and:foo")
.map(String::toUpperCase)
.forEach(System.out::println);
Stream 的其他方法
轉為無序流
使用 unordered() 方法可將 Stream 隨時轉為無序流。
轉換為Spliterator
使用 spliterator() 方法可將 Stream 轉為 Spliterator,Spliterator 介紹請看 https://juejin.im/post/5cf2622de51d4550bf1ae7ff。
綜合示例
根據1962年第1屆百花獎至2018年第34屆百花獎數據,有以下數據,編寫代碼按照獲得最佳男主角的演員次數排名,次數相同的按照參演年份正序排,並列印他所參演的電影。
序號 | 最佳男主角 | 電影 |
---|---|---|
第1屆1962年 | 崔嵬 | 《紅旗譜》 |
第2屆1963年 | 張良 | 《哥倆好 |
第3屆1980年 | 李仁堂 | 《淚痕》 |
第4屆1981年 | 達式常 | 《燕歸來》 |
第5屆1982年 | 王心剛 | 《知音》 |
第6屆1983年 | 嚴順開 | 《阿Q正傳》 |
第7屆1984年 | 楊在葆 | 《血,總是熱的》 |
第8屆1985年 | 呂曉禾 | 《高山下的花環》 |
第9屆1986年 | 楊在葆 | 《代理市長》 |
第10屆1987年 | 薑文 | 《芙蓉鎮》 |
第11屆1988年 | 張藝謀 | 《老井》 |
第12屆1989年 | 薑文 | 《春桃》 |
第13屆1990年 | 古月 | 《開國大典》 |
第14屆1991年 | 李雪健 | 《焦裕祿》 |
第15屆1992年 | 王鐵成 | 《周恩來》 |
第16屆1993年 | 古月 | 《毛ze東的故事》 |
第17屆1994年 | 李保田 | 《鳳凰琴》 |
第18屆1995年 | 李仁堂 | 《被告山杠爺》 |
第19屆1996年 | 張國立 | 《混在北京》 |
第20屆1997年 | 高明 | 《孔繁森》 |
第21屆1998年 | 葛優 | 《甲方乙方》 |
第22屆1999年 | 趙本山 | 《男婦女主任》 |
第23屆2000年 | 潘長江 | 《明天我愛你》 |
第24屆2001年 | 王慶祥 | 《生死抉擇》 |
第25屆2002年 | 葛優 | 《大腕》 |
第26屆2003年 | 盧奇 | 《鄧小平》 |
第27屆2004年 | 葛優 | 《手機》 |
第27屆2004年 | 李幼斌 | 《驚心動魂》 |
第28屆2006年 | 吳軍 | 《張思德》 |
第29屆2008年 | 張涵予 | 《集結號》 |
第30屆2010年 | 陳坤 | 《畫皮》 |
第31屆2012年 | 文章 | 《失戀33天》 |
第32屆2014年 | 黃曉明 | 《中國合伙人》 |
第33屆2016年 | 馮紹峰 | 《狼圖騰》 |
第34屆2018年 | 吳京 | 《戰狼2》 |
根據題目要求,創建 HundredFlowersAwards 實體用來存儲上述數據,我們分析題目要求最終需要轉換為以演員為主的信息,然後再根據演員的獲獎次數及出演年份做排序。
所以創建 ActorInfo 實體,包含 演員姓名和出演電影的信息。出演電影也需創建實體 FilmInfo ,包含 出演年份和電影名稱。
有瞭如上存儲數據實體信息後,代碼實現邏輯如下:
- 將百花獎的集合數據轉換為 Stream
- 將該數據流轉換為Map類型,Map 的 key 為演員名,Map 的 Value 為演員信息
- 對於重覆出現的演員,我們需要把電影信息追加到該演員出現的電影列表中
- 對於處理完的 Map 數據,將該 Map 的 values 數據再次轉換為 Stream
- 將該 Stream 排序即可。
list.stream()
.collect(Collectors.toMap(HundredFlowersAwards::getActorName, ActorInfo::new, ActorInfo::addFilmInfos))
.values()
.stream()
.sorted(new ActorComparator())
.forEach(System.out::println);
本節代碼見 StreamOtherTest 。
經過幾天的學習和總結,以上就是 Java Stream Api 的全部內容了。從開始認識 Stream Api,我們逐漸瞭解了使用 Stream Api 的流程:創建 Stream 、中間操作、終端操作。
我們對創建 Stream 、中間操作、終端操作的各個 api 方法進行了介紹及案例演示,之後我們還單獨抽出一節講解了 Collector 介面的實現及使用。
上述內容雖然文字不多,大部分都在代碼中給出了演示,希望大家能下載下來代碼並運行,以加深印象。
以上是前傳部分的學習內容了,接下來我們將進入到 Reactor 部分的學習。
源碼下載:https://github.com/crystalxmumu/spring-web-flux-study-note
參考
- 【Java8新特性】關於Java8的Stream API,看這一篇就夠了{:target="_blank"}
- 一文秒懂 Java Fork/Join{:target="_blank"}
- 深入淺出parallelStream{:target="_blank"}