上篇內容我們學習了Stream的大部分終端操作,我們這篇著重瞭解下Stream中重要的終端操作:collect。 ...
Stream API
上篇內容我們學習了Stream的大部分終端操作,我們這篇著重瞭解下Stream中重要的終端操作:collect。
collect 方法
序號 | 支持的類 | 方法定義 | 方法說明 |
---|---|---|---|
1 | Stream |
對此流的元素執行 mutable reduction操作。 | |
2 | Stream |
<R, A> R collect(Collector<? super T, A, R> collector); | 使用 Collector對此流的元素執行 mutable reduction Collector。 |
以下代碼見 StreamTerminalOperationTransformTest。
實現3參數轉換介面
序號1的方法,傳遞了3個參數,參數1為創建新結果容器的函數;參數2為累加器函數,將參數1和流內元素執行累加操作;參數3為組合器函數,並行執行時會使用該函數。
同步執行時,該方法相當於執行:
R result = supplier.get();
for (T element : this stream) {
accumulator.accept(result, element);
}
return result;
我們編寫如下代碼,看下實際效果
// 使用collect方法實現字元串連接
log.info("拼接字元串為:{}",
Stream.of("I", "love", "you", "too")
.collect(StringBuilder::new, (b1, b2) -> {
log.info("累加執行:{} + {}", b1, b2);
b1.append(b2);
}, (b1, b2) -> {
log.info("組合執行:{} ++ {}", b1, b2);
b1.append(b2);
})
.toString());
以上代碼將輸出如下日誌:
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + I
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行:I + love
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行:Ilove + you
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行:Iloveyou + too
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 拼接字元串為:Iloveyoutoo
並行執行時,該方法相當於執行:
R result1 = supplier.get();
R result2 = supplier.get();
R result3 = supplier.get();
R result4 = supplier.get();
// 累加執行,此處為併發(多線程)執行,每行代表一個線程
accumulator.accept(result1, element1);
accumulator.accept(result2, element2);
accumulator.accept(result3, element3);
accumulator.accept(result4, element4);
// ...
// accumulator.accept(resultN, elementN);
// 開始組合,此處為併發(多線程)執行,每行代表一個線程
combiner.accept(result1, result2);
combiner.accept(result3, result4);
combiner.accept(result1, result3);
// combiner.accept(result1, resultN);
return result1;
將上述的代碼改為.parallel()方式調用,將輸出如下日誌:
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + you
[ForkJoinPool.commonPool-worker-3] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + I
[ForkJoinPool.commonPool-worker-2] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + too
[ForkJoinPool.commonPool-worker-2] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 組合執行:you ++ too
[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + love
[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 組合執行:I ++ love
[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 組合執行:Ilove ++ youtoo
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 拼接字元串為:Iloveyoutoo
註意:上述日誌中出現的ForkJoinPool.commonPool-worker-N為併發(多線程)執行時的線程名。
實現Collector介面
實現Collector需要實現如下4個介面:
// 一個創建並返回一個新的可變結果容器的函數。
Supplier<A> supplier();
// 將值摺疊成可變結果容器的函數。
BiConsumer<A, T> accumulator();
// 一個接受兩個部分結果並將其合併的函數。
BinaryOperator<A> combiner();
// 執行從中間累積類型 A到最終結果類型 R的最終 R 。
Function<A, R> finisher();
// 返回一個 Collector.Characteristics 類型的Set, 表示該收集容器的特征。
Set<Characteristics> characteristics();
collect方法執行時,他們的調用流程如下:
- 創建新的結果容器(supplier())
- 將新的數據元素併入結果容器(accumulator())
- 將兩個結果容器組合成一個(combiner())
- 在容器上執行可選的最終變換(finisher())
簡單來講,生成容器A,通過accumulator針對A及流元素T執行累加,(如果並行存在的話)對多個A執行組合combiner,最終執行finisher後由A轉換為R。對於使用者來說,A為中間變數,無關其實現細節。
我們實現一個計算整數流的平均數的Collector,代碼如下:
// 使用collector實現求ping均值
log.info("[1, 2, 3, 4, 5, 6]的平均值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.parallel()
.collect(new Collector<Integer, long[], Double>() {
@Override
public Supplier<long[]> supplier() {
return () -> new long[2];
}
@Override
public BiConsumer<long[], Integer> accumulator() {
return (a, t) -> {
log.info("{}累加{}", a, t);
a[0] += t;
a[1]++;
};
}
@Override
public BinaryOperator<long[]> combiner() {
return (a, b) -> {
log.info("{}組合{}", a, b);
a[0] += b[0];
a[1] += b[1];
return a;
};
}
@Override
public Function<long[], Double> finisher() {
return (a) -> a[1] == 0 ? 0 : new Long(a[0]).doubleValue() / a[1];
}
@Override
public Set<Characteristics> characteristics() {
Set<Characteristics> set = new HashSet<>();
set.add(Characteristics.CONCURRENT);
return set;
}
}
)
);
常用Collector
通過上面的示例,我們實現了一個自定義的Collector,我們發現實現一個自定義的Collector還是比較麻煩的,需要實現5個介面。
Java 開發者們已經想到了這個問題,他們額外提供了一個 of 方法,可以通過lambda的方式創建 collector,類似 collect 中傳遞幾個參數:提供者、累加器、組合器、完成器以及特征配置,此處我們就不細講了。
Java 開發者們更為貼心的為我們創建了一些常用的 Collector ,讓我們可以直接使用。這些常用的 Collector 實現放在 Collectors 類下,我們來瞭解下。
統計平均值 averagingXxx 的使用
Collectors 提供了 averagingDouble、averagingLong、averagingInt 3種統計平均值的 Collector 實現類,以下代碼以 averagingInt 為例,由於使用方式相似,我們就不舉例了。
// 使用collector實現求均值
log.info("[1, 2, 3, 4, 5, 6]的平均值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.averagingInt(n -> n))
);
統計元素個數 counting 的使用
該方法和 Stream 中的 count 方法一樣。
// 使用collector獲取元素數量
log.info("[1, 2, 3, 4, 5, 6]的個數:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.counting())
);
統計總和 summingXxx 的使用
Collectors 提供了 summingDouble、summingLong、summingInt 3種統計求和值的 Collecto r實現類,同時還提供了 summarizingDouble 、 summarizingLong 、summarizingInt 3種統計對象的 Collector 實現類,以下代碼以 summingInt 為例,由於使用方式相似,我們就不舉例了。
// 使用collector獲取總和
log.info("[1, 2, 3, 4, 5, 6]的總和:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.summingInt(n -> n))
);
統計最小元素 minBy 的使用
// 使用collector獲取最小元素
log.info("[1, 2, 3, 4, 5, 6]的最小值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.minBy(Integer::min))
.get()
);
統計最大元素 maxBy 的使用
// 使用collector獲取最da元素
log.info("[1, 2, 3, 4, 5, 6]的最大值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.maxBy(Integer::max))
.get()
);
統計累加處理 reducing 的使用
reducing 和 Stream 中的 reduce 操作方法類似,我們就不詳述了。
// 使用collector實現求均值
log.info("[1, 2, 3, 4, 5, 6]的求和:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.reducing(0, Integer::sum))
);
轉換映射 mapping 的使用
mapping 支持將 第一個參數的結果再次執行轉換,即向下游傳遞。
log.info("[1, 2, 3, 4, 5, 6]每個增加20後的平均值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.mapping(n -> n + 20, Collectors.averagingInt(n -> n)))
);
轉換連接 joining 的使用
joining 提供了 3 種重載方法,支持傳遞 分隔符、首碼、尾碼等。
// 使用collector連接字元串
log.info("連接字元串為:{}",
Stream.of("I", "love", "you", "too")
.collect(Collectors.joining(" ", "Java, ", "!"))
);
轉換為集合 toList 的使用
log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]轉換為集合:{}",
Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
.collect(Collectors.toList())
);
轉換為Map toMap 的使用
toMap 提供了 3 種重載方法,除了指定 Key 和 Value 的生成器外,區別在於對於 Key 重覆時, Value的處理方式;以及初始Map的生成器。
log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]轉換為Map:{}",
Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
.collect(Collectors.toMap(Object::toString, n -> n, Integer::sum))
);
轉換為Set toSet 的使用
log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]的轉換為Set:{}",
Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
.collect(Collectors.toSet())
);
轉換為分組 groupingBy 的使用
分組函數將流中元素按某種定義分組,也提供了 2 種重載方法,支持遞歸向下游分組。
log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]的分組數據:{}",
Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
.collect(Collectors.groupingBy(n -> n))
);
轉換為分區 partitioningBy 的使用
分區函數將流中元素按條件分為2組,也提供了 2 種重載方法,支持遞歸向下游分組。
log.info("[1, 2, 3, 4, 5, 6]的奇偶分區數據:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.partitioningBy(n -> n %2 == 0))
);
其他方法
Collectors 中還提供了 groupingByConcurrent 、 toCollection 、 toConcurrentMap 等幾種支持併發的 Collector 實現,用法基本和非併發的相同,我們就不詳述了。
源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note
以上是本期筆記的內容,我們下期見。