RxJS 的操作符(operators)是最有用的,儘管 Observable 是最基本的。操作符最基本的部分(pieces)就是以申明的方式允許複雜的非同步代碼組合簡化。 什麼是操作符? 操作符是函數。這裡有兩種操作符: 管道操作符(Pipeable Operators)是可以通過使用 管道傳輸到 ...
RxJS 的操作符(operators)是最有用的,儘管 Observable 是最基本的。操作符最基本的部分(pieces)就是以申明的方式允許複雜的非同步代碼組合簡化。
什麼是操作符?
操作符是函數。這裡有兩種操作符:
管道操作符(Pipeable Operators)是可以通過使用 observableInstance.pipe(operator())
管道傳輸到 Observable 對象。這些包括,filter(...),mergeMap(...)
。當調用他們時,它們不會改變已存在的 Observable 實例。相反,他們會返回一個新的 Observable,它在第一個 Observable 的基礎上的邏輯的 Observable。
管道操作符是一個函數,它有一個 Observable 作為輸入參數並且返回另一個 Observable。這是純函數(函數式)的操作:上一個 Observable 保留未修改。
管道操作符本質上就是一個純函數,它可以作為獨立函數被調用來創建一個新的 Observable。例如:of(1, 2, 3)
創建了一個 Observable,它會發送 1,2,3 一個接一個的。在下一接將詳細討論操作符的創建。
例如,操作符 map
被調用跟數組的 map 是類似的。就比如 [1, 2 , 3].map(x => x * x)
會返回 [1, 4, 9]
,被創建的 Observable 就像下麵:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));
// Logs:
// value: 1
// value: 4
// value: 9
將會發送 1,4,9。其他操作符的用法 first:
import { of } from 'rxjs';
import { first } from 'rxjs/operators';
first()(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));
// Logs:
// value: 1
註意 map
邏輯部分必須要動態構造,因為它必須被賦予映射函數來做。通過對比,first 是一個常數,但是還是得動態構造。作為一般實踐,所有的操作符都要被構造,無論他們是否需要參數。
管道(Piping)
管道操作符是函數,所以他們能像原生函數使用:op()(obs)
— 但是實際上,他們都互相交織在一起,很快就會演變得不利於閱讀的:op4()(op3()(op2()(op1()(obs))))
。所以基於這個原因,Observables 有一個方法 .pipe()
可以調用,它能完成相同的事,並且更易讀的:
obs.pipe(
op1(),
op2(),
op3(),
op4(),
)
作為一個風格,op()(obs)
永不這樣使用,甚至是只有一個操作符時,obs.pipe(op())
也是首選的。
創建操作符
什麼是創建操作符?為了區別管道操作符,創建操作符是一個函數,能被用來創建通過一些共同的預定義的行為或者關聯其他的 Observables 的 Observables 。
一個典型的創建操作符例子是 interval
函數。它要求一個數字類型作為輸入參數,並返回 Observable 作為輸出:
import { interval } from 'rxjs';
const observable = interval(1000 /* number of milliseconds */);
這裡查看所有的靜態創建操作符。
高階 Observables
Observables 大體上是有順序的發送值(數字或字元串),但是也有例外,處理 Observables 的 Observables 是有必要的,叫做高階 Observables。例如,想象你有一個 Observable,它發送文件的地址的字元串,這個字元串恰好是想要看到的。代碼如下:
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
);
http.get()
返回每個 URL 的 Observable 對象(可能是字元串或字元串數組)。現在你有一個 Observables 對象了,一個高階 Observable。
但是這個高階 Observable 是怎麼工作的呢?通常通過平坦化:通過某種方式將高階 Observable 轉換成一個普通的 Observable。例如:
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
concatAll(),
);
concatAll()
操作訂閱了每個內部的 Observable,它會帶出來一個 Observable 對象,並複製所有的發送的值知道Observable 完成,然後繼續執行下一個。所有的值都是以這種方式串起來的。其他的扁平用處的操作(也叫鏈接操作符(join operators))
mergeAll()
—— 訂閱每個內部到達的 Observable 對象,然後發送它到達的每一個值switchAll()
—— 當內部的 Observable 第一個到達時訂閱(Observable),然後每個到達時發送值,但是當下個內部的 Observable 到達時,上一個訂閱取消,並訂閱這個新的 Observable。exhaust()
—— 當 Observable 到達時訂閱第一個,每個在它到達時發送值,在第一個完成之前,丟棄所有新到大的 Observable 對象,完成之後等待下一個 Observable。
就像組合了數組庫中許多的 map()
,flat()
(或者是 flatten()
),他們映射就等價於 RxJS 中的 扁平操作符 concatMap(),mergeMap()
,switchMap()
以及 exhaustMap()
。
彈珠圖標(Marble diagrams)
為瞭解釋操作符是如何工作的,本文還不足以描述。太多的操作符相關聯了,他們實例化可能以不同的方式延遲,舉例,節流,降低值發送(頻率)。為了更好的描述,圖標經常是最好的工具。彈珠圖表是可視化的表示 operator 是如何工作的,也包括了 Observable 輸入,operator 以及它們的參數,還有 Observable 的輸出。
在彈珠圖中,時間軸向右移動,並且圖描述了值 “彈珠” 在 Observable 運行的時候是如何發送的。
根據下麵你能看到彈珠圖的解析。
貫穿文檔節點,我們普遍使用 marble diagrams 來解釋 operator 是如何工作的。它們在其他的方面也是很有用的,比如在白板甚至是在我們的單元測試中(作為 ASCII 圖表)。
按照圖標的順序,我解釋下上面的意思:
- 這個從左到右的時間軸是表示輸入的 Observable 的執行過程
- 4,6,a,8 這些值是 Observable 要發送的值
- 緊接著的 “|” 是表示 “完成” 通知,表明這個 Observable 已經成功完成。
- 中間的盒子表明操作符,它傳遞上面輸入的 Observable 生成一個 Observable 作為輸出(下麵一條線)方框內的文本顯示了轉換的性質
- 調用操作符輸出 Observable
- “X” 表示這個輸出的 Observable 發送錯誤,表明異常終止。至此之後不會有任何值發送。
操作符的分類
有很多操作是使用目的是不同的,他們分這幾類:創建,轉換,過濾,聯合,多播,錯誤處理,公共等等。下麵的列表你會發現所有被分類的操作符。
為了完成的概述,你可以看這個 API 文檔
創建操作符
ajax
bindCallback
bindNodeCallback
defer
empty
from
fromEvent
fromEventPattern
generate
interval
of
range
throwError
timer
iif
鏈接創建操作
轉化操作符
buffer
bufferCount
bufferTime
bufferToggle
bufferWhen
concatMap
concatMapTo
exhaust
exhaustMap
expand
groupBy
map
mapTo
mergeMap
mergeMapTo
mergeScan
pairwise
partition
pluck
scan
switchMap
switchMapTo
window
windowCount
windowTime
windowToggle
windowWhen
過濾操作符
audit
auditTime
debounce
debounceTime
distinct
distinctKey
distinctUntilChanged
distinctUntilKeyChanged
elementAt
filter
first
ignoreElements
last
sample
sampleTime
single
skip
skipLast
skipUntil
skipWhile
take
takeLast
takeUntil
takeWhile
throttle
throttleTime
連接操作符
你也可以看 連接創建操作符 一節
多播操作符
異常處理操作符
公共操作符
tap
delay
delayWhen
dematerialize
materialize
observeOn
subscribeOn
timeInterval
timestamp
timeout
timeoutWith
toArray
條件布爾操作符
數學和聚合操作符
創建自定義的 Observables
使用 pipe()
函數生成新的 Observable
如果在你的代碼里有常用的操作符序列,用 pipe()
函數來提取到新的操作符。甚至這個序列是不常見的,中斷它到單個的操作符,這樣能提高可讀性。
舉個例子,你可以生成一個函數,這個函數捨棄奇數的值並給偶數值加倍:
import { pipe } from 'rxjs';
import { filter, map } from 'rxjs';
function discardOddDoubleEven()
{
return pipe(
filter(v => !(v % 2)),
map(v => v +v),
);
}
(pipe()
函數是這樣類比的,但是又與 .pipe()
不是完全一樣)
從零創建新的操作符
這個非常複雜,如果你一定要寫一個不能通過現有的操作符組合而成的操作符(這是極少情況),你可以使用 Observable 構造函數從零寫一個操作符,就像下麵這樣:
import { Observable } from 'rxjs';
function delay(delayInMillis)
{
return (observable) => new Observable(observer => {
//這個函數將會沒當 Observable 訂閱的時候調用
const allTimerIDs = new Set();
const subscription = observable.subscribe({
next(value){
const timerID = setTimeout(() => {
observer.next(value);
allTimeIDs.delete(timerID);
}, delayInMillis);
allTimerIDs.add(timerID);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
}
});
//返回值是一個卸載函數
//它當新的 Observable 被取消訂閱時調用
return () => {
subscription.unsubscribe();
allTimerIDs.forEach(timerID => {
clearTimeout(timerID)
});
}
});
}
你必須要註意以下幾點
- 實現所有的 Observer 函數,
next()
,error()
,complete()
當訂閱這個輸入的 Observer 的時候。 - 實現一個卸載函數,它負責清理,當 Observable 完成的時候(在上面的例子是通過取消訂閱以及清除時間定時器函數)
- 返回一個從 Observable 構造傳遞過來的函數的卸載函數
當然,這個只是一個例子;delay()
操作符已經存在了。