Reactor 操作符 上篇文章我們將 Flux 和 Mono 的操作符分了 11 類,我們來繼續學習轉換類操作符的第 2 篇。 轉換類操作符 轉換類的操作符數量最多,平常過程中也是使用最頻繁的。 Flux#concatMap 將響應式流中元素順序轉換為目標類型的響應式流,之後再將這些流連接起來。該 ...
Reactor 操作符
上篇文章我們將 Flux 和 Mono 的操作符分了 11 類,我們來繼續學習轉換類操作符的第 2 篇。
轉換類操作符
轉換類的操作符數量最多,平常過程中也是使用最頻繁的。
Flux#concatMap
將響應式流中元素順序轉換為目標類型的響應式流,之後再將這些流連接起來。該方法提供了 2 個重載方法,傳遞的第 2 個參數為內部生成響應式流的預取數量。見圖知意:
Flux.range(3, 8)
.concatMap(n -> Flux.just(n - 10, n, n + 10), 3)
.subscribe(System.out::println);
Flux#concatMapDelayError
concatMapDelayError 和 concatMap 區別在於,當內部生成響應式流發出 error 時,是否延遲響應 error 。該方法提供了 3 個重載方法,支持傳遞參數:是否延遲發出錯誤和預取數量。
Flux.range(3, 8)
.concatMapDelayError(n -> {
if (n == 4) {
return Flux.error(new NullPointerException());
}
return Flux.just(n - 10, n, n + 10);
})
.subscribe(System.out::println, System.err::println);
Flux#concatIterable
concatIterable 和 concatMap 的區別在於 內部返回的類型不同,一個為 Iterable, 一個為 響應式流。見圖知意:
Flux.range(3, 8)
.publishOn(Schedulers.single())
.concatMapIterable(n -> {
if (n == 4) {
throw new NullPointerException();
}
return Arrays.asList(n - 10, n, n + 10);
})
.onErrorContinue((e, n) -> System.err.println("數據:" + n + ",發生錯誤:" + e))
.subscribe(System.out::println);
elapsed
收集響應式流中元素的間隔發出時間,轉換為 時間間隔 和 舊元素 組成的 Tuple2 的響應式流。見圖知意:
Flux.interval(Duration.ofMillis(300))
.take(20)
.elapsed(Schedulers.parallel())
.subscribe(System.out::println);
Thread.sleep(7000);
expand
從上層節點逐層展開方式遞歸展開樹形節點。
Flux.just(16, 18, 20)
.expand(n -> {
if (n % 2 == 0) {
return Flux.just(n / 2);
} else {
return Flux.empty();
}
})
.subscribe(System.out::println);
expandDeep
從上層節點逐個展開方式遞歸展開樹形節點。expand 和 expandDeep 的區別在於展開方式不同,另外它倆都提供了 capacityHint 指定遞歸時初始化容器的容量。
Flux.just(16, 18, 20)
.expandDeep(n -> {
if (n % 2 == 0) {
return Flux.just(n / 2);
} else {
return Flux.empty();
}
})
.subscribe(System.out::println);
總結
本篇我們介紹了 Reactor 部分的轉換類操作符,講解示例時都是單個操作符,相信大家都能理解。
由於最近學習時間不確定,內容比較少。無論工作還是生活的困難,我們只要堅持,終將會被剋服解決。今天的內容就學到這裡,我們下篇繼續學習 Reactor 的操作符。
源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorTransformOperator02Test 測試類。