本節探討Java 8中的函數式數據處理 - Stream API,它能大大簡化常見的集合數據操作,怎麼做到的呢? ...
上節我們介紹了Lambda表達式和函數式介面,本節探討它們的應用,函數式數據處理,針對常見的集合數據處理,Java 8引入了一套新的類庫,位於包java.util.stream下,稱之為Stream API,這套API操作數據的思路,不同於我們在38節到55節介紹的容器類API,它們是函數式的,非常簡潔、靈活、易讀,具體有什麼不同呢?由於內容較多,我們分為兩節來介紹,本節先介紹一些基本的API,下節討論一些高級功能。
基本概念
介面Stream類似於一個迭代器,但提供了更為豐富的操作,Stream API的主要操作就定義在該介面中。 Java 8給Collection介面增加了兩個預設方法,它們可以返回一個Stream,如下所示:
default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); } default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); }
stream()返回的是一個順序流,parallelStream()返回的是一個並行流。順序流就是由一個線程執行操作。而並行流背後可能有多個線程並行執行,與之前介紹的併發技術不同,使用並行流不需要顯式管理線程,使用方法與順序流是一樣的。
下麵,我們主要針對順序流,學習Stream介面,包括其用法和基本原理,隨後我們再介紹下並行流。先來看一些簡單的示例。
基本示例
上節演示時使用了學生類Student和學生列表List<Student> lists,本節繼續使用它們。
基本過濾
返回學生列表中90分以上的,傳統上的代碼一般是這樣的:
List<Student> above90List = new ArrayList<>(); for (Student t : students) { if (t.getScore() > 90) { above90List.add(t); } }
使用Stream API,代碼可以這樣:
List<Student> above90List = students.stream() .filter(t->t.getScore()>90) .collect(Collectors.toList());
先通過stream()得到一個Stream對象,然後調用Stream上的方法,filter()過濾得到90分以上的,它的返回值依然是一個Stream,為了轉換為List,調用了collect方法並傳遞了一個Collectors.toList(),表示將結果收集到一個List中。
代碼更為簡潔易讀了,這種數據處理方式被稱為函數式數據處理,與傳統代碼相比,它的特點是:
- 沒有顯式的迴圈迭代,迴圈過程被Stream的方法隱藏了
- 提供了聲明式的處理函數,比如filter,它封裝了數據過濾的功能,而傳統代碼是命令式的,需要一步步的操作指令
- 流暢式介面,方法調用鏈接在一起,清晰易讀
基本轉換
根據學生列表返回名稱列表,傳統上的代碼一般是這樣:
List<String> nameList = new ArrayList<>(students.size()); for (Student t : students) { nameList.add(t.getName()); }
使用Stream API,代碼可以這樣:
List<String> nameList = students.stream()
.map(Student::getName)
.collect(Collectors.toList());
這裡使用了Stream的map函數,它的參數是一個Function函數式介面,這裡傳遞了方法引用。
基本的過濾和轉換組合
返回90分以上的學生名稱列表,傳統上的代碼一般是這樣:
List<String> nameList = new ArrayList<>(); for (Student t : students) { if (t.getScore() > 90) { nameList.add(t.getName()); } }
使用函數式數據處理的思路,可以將這個問題分解為由兩個基本函數實現:
- 過濾:得到90分以上的學生列表
- 轉換:將學生列表轉換為名稱列表
使用Stream API,可以將基本函數filter()和map()結合起來,代碼可以這樣:
List<String> above90Names = students.stream() .filter(t->t.getScore()>90) .map(Student::getName) .collect(Collectors.toList());
這種組合利用基本函數、聲明式實現集合數據處理功能的編程風格,就是函數式數據處理。
代碼更為直觀易讀了,但你可能會擔心它的性能有問題。filter()和map()都需要對流中的每個元素操作一次,一起使用會不會就需要遍歷兩次呢?答案是否定的,只需要一次。實際上,調用filter()和map()都不會執行任何實際的操作,它們只是在構建操作的流水線,調用collect才會觸發實際的遍歷執行,在一次遍歷中完成過濾、轉換以及收集結果的任務。
像filter和map這種不實際觸發執行、用於構建流水線、返回Stream的操作被稱為中間操作(intermediate operation),而像collect這種觸發實際執行、返回具體結果的操作被稱為終端操作(terminal operation)。Stream API中還有更多的中間和終端操作,下麵我們具體來看下。
中間操作
除了filter和map,Stream API的中間操作還有distinct, sorted, skip, limit, peek, mapToLong, mapToInt, mapToDouble, flatMap等,我們逐個來看下。
distinct
distinct返回一個新的Stream,過濾重覆的元素,只留下唯一的元素,是否重覆是根據equals方法來比較的,distinct可以與其他函數如filter, map結合使用。
比如,返回字元串列表中長度小於3的字元串、轉換為小寫、只保留唯一的,代碼可以為:
List<String> list = Arrays.asList(new String[]{"abc","def","hello","Abc"}); List<String> retList = list.stream() .filter(s->s.length()<=3) .map(String::toLowerCase) .distinct() .collect(Collectors.toList()); System.out.println(retList);
輸出為:
[abc, def]
雖然都是中間操作,但distinct與filter和map是不同的,filter和map都是無狀態的,對於流中的每一個元素,它的處理都是獨立的,處理後即交給流水線中的下一個操作,但distinct不同,它是有狀態的,在處理過程中,它需要在內部記錄之前出現過的元素,如果已經出現過,即重覆元素,它就會過濾掉,不傳遞給流水線中的下一個操作。
對於順序流,內部實現時,distinct操作會使用HashSet記錄出現過的元素,如果流是有順序的,需要保留順序,會使用LinkedHashSet。
sorted
有兩個sorted方法:
Stream<T> sorted() Stream<T> sorted(Comparator<? super T> comparator)
它們都對流中的元素排序,都返回一個排序後的Stream,第一個方法假定元素實現了Comparable介面,第二個方法接受一個自定義的Comparator。
比如,過濾得到90分以上的學生,然後按分數從高到低排序,分數一樣的,按名稱排序,代碼可以為:
List<Student> list = students.stream() .filter(t->t.getScore()>90) .sorted(Comparator.comparing(Student::getScore) .reversed() .thenComparing(Student::getName)) .collect(Collectors.toList());
這裡,使用了Comparator的comparing, reversed和thenComparing構建了Comparator。
與distinct一樣,sorted也是一個有狀態的中間操作,在處理過程中,需要在內部記錄出現過的元素,與distinct不同的是,每碰到流中的一個元素,distinct都能立即做出處理,要麼過濾,要麼馬上傳遞給下一個操作,但sorted不能,它需要先排序,為了排序,它需要先在內部數組中保存碰到的每一個元素,到流結尾時,再對數組排序,然後再將排序後的元素逐個傳遞給流水線中的下一個操作。
skip/limit
它們的定義為:
Stream<T> skip(long n) Stream<T> limit(long maxSize)
skip跳過流中的n個元素,如果流中元素不足n個,返回一個空流,limit限制流的長度為maxSize。
比如,將學生列表按照分數排序,返回第3名到第5名,代碼可以為:
List<Student> list = students.stream() .sorted(Comparator.comparing( Student::getScore).reversed()) .skip(2) .limit(3) .collect(Collectors.toList());
skip和limit都是有狀態的中間操作。對前n個元素,skip的操作就是過濾,對後面的元素,skip就是傳遞給流水線中的下一個操作。limit的一個特點是,它不需要處理流中的所有元素,只要處理的元素個數達到maxSize,後面的元素就不需要處理了,這種可以提前結束的操作被稱為短路操作。
peek
peek的定義為:
Stream<T> peek(Consumer<? super T> action)
它返回的流與之前的流是一樣的,沒有變化,但它提供了一個Consumer,會將流中的每一個元素傳給該Consumer。這個方法的主要目的是支持調試,可以使用該方法觀察在流水線中流轉的元素,比如:
List<String> above90Names = students.stream() .filter(t->t.getScore()>90) .peek(System.out::println) .map(Student::getName) .collect(Collectors.toList());
mapToLong/mapToInt/mapToDouble
map函數接受的參數是一個Function<T, R>,為避免裝箱/拆箱,提高性能,Stream還有如下返回基本類型特定流的方法:
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) IntStream mapToInt(ToIntFunction<? super T> mapper) LongStream mapToLong(ToLongFunction<? super T> mapper)
DoubleStream/IntStream/LongStream是基本類型特定的流,有一些專門的更為高效的方法。比如,求學生列表的分數總和,代碼可以為:
double sum = students.stream() .mapToDouble(Student::getScore) .sum();
flatMap
flatMap的定義為:
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)
它接受一個函數mapper,對流中的每一個元素,mapper會將該元素轉換為一個流Stream,然後把新生成流的每一個元素傳遞給下一個操作。比如:
List<String> lines = Arrays.asList(new String[]{ "hello abc", "老馬 編程" }); List<String> words = lines.stream() .flatMap(line -> Arrays.stream(line.split("\\s+"))) .collect(Collectors.toList()); System.out.println(words);
這裡的mapper將一行字元串按空白符分隔為了一個單詞流,Arrays.stream可以將一個數組轉換為一個流,輸出為:
[hello, abc, 老馬, 編程]
可以看出,實際上,flatMap完成了一個1到n的映射。
針對基本類型,flatMap還有如下類似方法:
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper)
終端操作
中間操作不觸發實際的執行,返回值是Stream,而終端操作觸發執行,返回一個具體的值,除了collect,Stream API的終端操作還有max, min, count, allMatch, anyMatch, noneMatch, findFirst, findAny, forEach, toArray, reduce等,我們逐個來看下。
max/min
max/min的定義為:
Optional<T> max(Comparator<? super T> comparator) Optional<T> min(Comparator<? super T> comparator)
它們返迴流中的最大值/最小值,值的註意的是,它的返回值類型是Optional<T>,而不是T。
java.util.Optional是Java 8引入的一個新類,它是一個泛型容器類,內部只有一個類型為T的單一變數value,可能為null,也可能不為null。Optional有什麼用呢?它用於準確地傳遞程式的語義,它清楚地表明,其代表的值可能為null,程式員應該進行適當的處理。
Optional定義了一些方法,比如:
// value不為null時返回true public boolean isPresent() // 返回實際的值,如果為null,拋出異常NoSuchElementException public T get() // 如果value不為null,返回value,否則返回other public T orElse(T other) // 構建一個空的Optional,value為null public static<T> Optional<T> empty() // 構建一個非空的Optional, 參數value不能為null public static <T> Optional<T> of(T value) // 構建一個Optional,參數value可以為null,也可以不為null public static <T> Optional<T> ofNullable(T value)
在max/min的例子中,通過聲明返回值為Optional,我們就知道,具體的返回值不一定存在,這發生在流中不含任何元素的情況下。
看個簡單的例子,返回分數最高的學生,代碼可以為:
Student student = students.stream()
.max(Comparator.comparing(Student::getScore).reversed())
.get();
這裡,假定students不為空。
count
count很簡單,就是返迴流中元素的個數。比如,統計大於90分的學生個數,代碼可以為:
long above90Count = students.stream() .filter(t->t.getScore()>90) .count();
allMatch/anyMatch/noneMatch
這幾個函數都接受一個謂詞Predicate,返回一個boolean值,用於判定流中的元素是否滿足一定的條件,它們的區別是:
- allMatch: 只有在流中所有元素都滿足條件的情況下才返回true
- anyMatch: 只要流中有一個元素滿足條件就返回true
- noneMatch: 只有流中所有元素都不滿足條件才返回true
如果流為空,這幾個函數的返回值都是true。
比如,判斷是不是所有學生都及格了(不小於60分),代碼可以為:
boolean allPass = students.stream() .allMatch(t->t.getScore()>=60);
這幾個操作都是短路操作,都不一定需要處理所有元素就能得出結果,比如,對於allMatch,只要有一個元素不滿足條件,就能返回false。
findFirst/findAny
它們的定義為:
Optional<T> findFirst()
Optional<T> findAny()
它們的返回類型都是Optional,如果流為空,返回Optional.empty()。findFirst返回第一個元素,而findAny返回任一元素,它們都是短路操作。
隨便找一個不及格的學生,代碼可以為:
Optional<Student> student = students.stream() .filter(t->t.getScore()<60) .findAny(); if(student.isPresent()){ // 不及格的學生.... }
forEach
有兩個foreach方法:
void forEach(Consumer<? super T> action) void forEachOrdered(Consumer<? super T> action)
它們都接受一個Consumer,對流中的每一個元素,傳遞元素給Consumer,區別在於,在並行流中,forEach不保證處理的順序,而forEachOrdered會保證按照流中元素的出現順序進行處理。
比如,逐行列印大於90分的學生,代碼可以為:
students.stream() .filter(t->t.getScore()>90) .forEach(System.out::println);
toArray
toArray將流轉換為數組,有兩個方法:
Object[] toArray()
<A> A[] toArray(IntFunction<A[]> generator)
不帶參數的toArray返回的數組類型為Object[],這經常不是期望的結果,如果希望得到正確類型的數組,需要傳遞一個類型為IntFunction的generator,IntFunction的定義為:
public interface IntFunction<R> { R apply(int value); }
generator接受的參數是流的元素個數,它應該返回對應大小的正確類型的數組。
比如,獲取90分以上的學生數組,代碼可以為:
Student[] above90Arr = students.stream() .filter(t->t.getScore()>90) .toArray(Student[]::new);
Student[]::new就是一個類型為IntFunction<Student[]>的generator。
reduce
reduce代表歸約或者叫摺疊,它是max/min/count的更為通用的函數,將流中的元素歸約為一個值,有三個reduce函數:
Optional<T> reduce(BinaryOperator<T> accumulator); T reduce(T identity, BinaryOperator<T> accumulator); <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
第一個基本等同於調用:
boolean foundAny = false; T result = null; for (T element : this stream) { if (!foundAny) { foundAny = true; result = element; } else result = accumulator.apply(result, element); } return foundAny ? Optional.of(result) : Optional.empty();
比如,使用reduce求分數最高的學生,代碼可以為:
Student topStudent = students.stream().reduce((accu, t) -> { if (accu.getScore() >= t.getScore()) { return accu; } else { return t; } }).get();
第二個reduce函數多了一個identity參數,表示初始值,它基本等同於調用:
T result = identity; for (T element : this stream) result = accumulator.apply(result, element) return result;
第一個和第二個reduce的返回類型只能是流中元素的類型,而第三個更為通用,它的歸約類型可以自定義,另外,它多了一個combiner參數,combiner用在並行流中,用於合併子線程的結果,對於順序流,它基本等同於調用:
U result = identity; for (T element : this stream) result = accumulator.apply(result, element) return result;
註意與第二個reduce函數相區分,它的結果類型不是T,而是U。比如,使用reduce函數計算學生分數的和,代碼可以為:
double sumScore = students.stream().reduce(0d, (sum, t) -> sum += t.getScore(), (sum1, sum2) -> sum1 += sum2 );
以上,可以看出,reduce雖然更為通用,但比較費解,難以使用,一般情況,應該優先使用其他函數。
collect函數比reduce更為通用、強大和易用,關於它,我們下節再詳細介紹。
構建流
前面我們提到,可以通過Collection介面的stream/parallelStream獲取流,還有一些其他的方式可以獲取流。
Arrays有一些stream方法,可以將數組或子數組轉換為流,比如:
public static IntStream stream(int[] array) public static DoubleStream stream(double[] array, int startInclusive, int endExclusive) public static <T> Stream<T> stream(T[] array)
比如,輸出當前目錄下所有普通文件的名字,代碼可以為:
File[] files = new File(".").listFiles(); Arrays.stream(files) .filter(File::isFile) .map(File::getName) .forEach(System.out::println);
Stream也有一些靜態方法,可以構建流:
//返回一個空流 public static<T> Stream<T> empty() //返回只包含一個元素t的流 public static<T> Stream<T> of(T t) //返回包含多個元素values的流 public static<T> Stream<T> of(T... values) //通過Supplier生成流,流的元素個數是無限的 public static<T> Stream<T> generate(Supplier<T> s) //同樣生成無限流,第一個元素為seed,第二個為f(seed),第三個為f(f(seed)),依次類推 public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
比如,輸出10個隨機數,代碼可以為:
Stream.generate(()->Math.random()) .limit(10) .forEach(System.out::println);
輸出100個遞增的奇數,代碼可以為:
Stream.iterate(1, t->t+2) .limit(100) .forEach(System.out::println);
並行流
前面我們主要使用的是Collection的stream()方法,換做parallelStream()方法,就會使用並行流,介面方法都是通用的。但並行流內部會使用多線程,線程個數一般與系統的CPU核數一樣,以充分利用CPU的計算能力。
進一步來說,並行流內部會使用Java 7引入的fork/join框架,簡單來說,處理由fork和join兩個階段組成,fork就是將要處理的數據拆分為小塊,多線程按小塊進行並行計算,join就是將小塊的計算結果進行合併,具體我們就不探討了。使用並行流,不需要任何線程管理的代碼,就能實現並行。
函數式數據處理思維
看的出來,使用Stream API處理數據集合,與直接使用容器類API處理數據的思路是完全不一樣的。
流定義了很多數據處理的基本函數,對於一個具體的數據處理問題,解決的主要思路就是組合利用這些基本函數,實現期望的功能,這種思路就是函數式數據處理思維,相比直接利用容器類API的命令式思維,思考的層次更高。
Stream API的這種思路也不是新發明,它與資料庫查詢語言SQL是很像的,都是聲明式地操作集合數據,很多函數都能在SQL中找到對應,比如filter對應SQL的where,sorted對應order by等。SQL一般都支持分組(group by)功能,Stream API也支持,但關於分組,我們下節再介紹。
Stream API也與各種基於Unix系統的管道命令類似,熟悉Unix系統的都知道,Unix有很多命令,大部分命令只是專註於完成一件事情,但可以通過管道的方式將多個命令鏈接起來,完成一些複雜的功能,比如:
cat nginx_access.log | awk '{print $1}' | sort | uniq -c | sort -rnk 1 | head -n 20
以上命令可以分析nginx訪問日誌,統計出訪問次數最多的前20個IP地址及其訪問次數。具體來說,cat命令輸出nginx訪問日誌到流,一行為一個元素,awk輸出行的第一列,這裡為IP地址,sort按IP進行排序,"uniq -c"按IP統計計數,"sort -rnk 1"按計數從高到低排序,"head -n 20"輸出前20行。
小結
本節初步介紹了Java 8引入的函數式數據處理類庫,Stream API,它類似於Unix的管道命令,也類似於資料庫查詢語言SQL,通過組合利用基本函數,可以在更高的層次上思考問題,以聲明式的方式簡潔地實現期望的功能。
對於collect方法,本節只是演示了最基本的應用,它還有很多高級功能,比如實現類似SQL的group by功能,具體怎麼實現?實現的原理是什麼呢?
(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic,位於包shuo.laoma.java8.c92下)
----------------
未完待續,查看最新文章,敬請關註微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及電腦技術的本質。用心原創,保留所有版權。