Observable 的 Operators集合

来源:http://www.cnblogs.com/solodancer/archive/2017/12/02/7954846.html
-Advertisement-
Play Games

內容為整理博主文章: "https://juejin.im/user/58870f04128fe10065efc8d9/article" 個人覺得他對Operators的解說較容易理解和全面,顧把它們整理在一起,也方面查找。 Operators: Observable 的 Operators 是實例 ...


內容為整理博主文章:https://juejin.im/user/58870f04128fe10065efc8d9/article 個人覺得他對Operators的解說較容易理解和全面,顧把它們整理在一起,也方面查找。


Operators:Observable 的 Operators 是實例應用上最重要的部份,我們需要瞭解各種 Operators 的使用方式,才能輕鬆實現各種需求!Operators 就是一個個被附加到 Observable 型別的函數。
Marble diagrams:我們把描繪 observable 的圖示稱為 Marble diagrams。

我們用 來表達一小段時間,這些 串起就代表一個 observable

----------------

X (大寫 X)則代表有錯誤發生

---------------X

| 則代表 observable 結束

----------------|

小括弧代表著同步發生


map

Observable 的 map 方法使用上跟數組的 map 是一樣的,我們傳入一個 callback function,這個 callback function 會帶入每次發發送來的元素,然後我們回傳新的元素,如下

var source = Rx.Observable.interval(1000);
var newest = source.map(x => x + 1); 
newest.subscribe(console.log);
// 1
// 2
// 3
// 4
// 5..

用 Marble diagrams 表達就是

source: -----0-----1-----2-----3--...
            map(x => x + 1)
newest: -----1-----2-----3-----4--...

mapTo

mapTo 可以把傳進來的值改成一個固定的值,如下

var source = Rx.Observable.interval(1000);
var newest = source.mapTo(2); 
newest.subscribe(console.log);
// 2
// 2
// 2
// 2..

用 Marble diagrams 表達就是

source: -----0-----1-----2-----3--...
            mapTo(2)
newest: -----2-----2-----2-----2--...

filter

filter 在使用上也跟數組的相同,我們要傳入一個 callback function,這個 function 會傳入每個被髮送的元素,並且回傳一個 boolean 值,如果為 true 的話就會保留,如果為 false 就會被濾掉,如下

var source = Rx.Observable.interval(1000);
var newest = source.filter(x => x % 2 === 0); 
newest.subscribe(console.log);
// 0
// 2
// 4
// 6..

用 Marble diagrams 表達就是

source: -----0-----1-----2-----3-----4-...
            filter(x => x % 2 === 0)
newest: -----0-----------2-----------4-...

skip

可以略過前幾個發送元素的 operator: skip,示例如下:

var source = Rx.Observable.interval(1000);
var example = source.skip(3);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 3
// 4
// 5...

原本從 0 開始的就會變成從 3 開始,但是記得原本元素的等待時間仍然存在,也就是說此示例第一個取得的元素需要等 4 秒,用 Marble Diagram 表示如下

source : ----0----1----2----3----4----5--....
                skip(3)
example: -------------------3----4----5--...

takeLast

除了可以用 take 取前幾個之外,我們也可以倒過來取最後幾個,示例如下:

var source = Rx.Observable.interval(1000).take(6);
var example = source.takeLast(2);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 4
// 5
// complete

這裡我們先取了前 6 個元素,再取最後兩個。所以最後會發送 4, 5, complete,這裡有一個重點,就是 takeLast 必須等到整個 observable 完成(complete),才能知道最後的元素有哪些,並且同步發送,如果用 Marble Diagram 表示如下

source : ----0----1----2----3----4----5|
                takeLast(2)
example: ------------------------------(45)|

這裡可以看到 takeLast 後,比須等到原本的 observable 完成後,才立即同步發送 4, 5, complete

last

跟 take(1) 相同,我們有一個 takeLast(1) 的簡化寫法,那就是 last() 用來取得最後一個元素

var source = Rx.Observable.interval(1000).take(6);
var example = source.last();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 5
// complete

用 Marble Diagram 表示如下

source : ----0----1----2----3----4----5|
                last()
example: ------------------------------(5)|

concat

concat 可以把多個 observable 實例合併成一個,示例如下

var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3)
var source3 = Rx.Observable.of(4,5,6)
var example = source.concat(source2, source3);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// complete

跟 concatAll 一樣,必須先等前一個 observable 完成(complete),才會繼續下一個,用 Marble Diagram 表示如下

source : ----0----1----2|
source2: (3)|
source3: (456)|
            concat()
example: ----0----1----2(3456)|

另外 concat 還可以當作靜態方法使用

var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3);
var source3 = Rx.Observable.of(4,5,6);
var example = Rx.Observable.concat(source, source2, source3);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

startWith

startWith 可以在 observable 的一開始塞要發送的元素,有點像 concat 但參數不是 observable 而是要發送的元素,使用示例如下

var source = Rx.Observable.interval(1000);
var example = source.startWith(0);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 0
// 1
// 2
// 3...

這裡可以看到我們在 source 的一開始塞了一個 0,讓 example 會在一開始就立即發送 0,用 Marble Diagram 表示如下

source : ----0----1----2----3--...
            startWith(0)
example: (0)----0----1----2----3--...

記得 startWith 的值是一開始就同步發出的,這個 operator 很常被用來保存程式的起始狀態

merge

merge 跟 concat 一樣都是用來合併 observable,但他們在行為上有非常大的不同!
讓我們直接來看例子吧

var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = source.merge(source2);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 0
// 1
// 2
// 1
// 3
// 2
// 4
// 5
// complete

上面可以看得出來,merge 把多個 observable 同時處理,這跟 concat 一次處理一個 observable 是完全不一樣的,由於是同時處理行為會變得較為複雜,這裡我們用 Marble Diagram 會比較好解釋

source : ----0----1----2|
source2: --0--1--2--3--4--5|
            merge()
example: --0-01--21-3--(24)--5|

這裡可以看到 merge 之後的 example 在時間序上同時在跑 source 與 source2,當兩件事情同時發生時,會同步發送資料(被 merge 的在後面),當兩個 observable 都結束時才會真的結束。

merge 同樣可以當作靜態方法用

var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = Rx.Observable.merge(source, source2);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

merge 的邏輯有點像是 OR(||),就是當兩個 observable 其中一個被觸發時都可以被處理,這很常用在一個以上的按鈕具有部分相同的行為。

例如一個影片播放器有兩個按鈕,一個是暫停(II),另一個是結束播放(口)。這兩個按鈕都具有相同的行為就是影片會被停止,只是結束播放會讓影片回到 00 秒,這時我們就可以把這兩個按鈕的事件 merge 起來處理影片暫停這件事。

var stopVideo = Rx.Observable.merge(stopButton, endButton);
stopVideo.subscribe(() => {
    // 暫停播放影片
})

combineLatest

它會取得各個 observable 最後送出的值,再輸出成一個值,我們直接看示例會比較好解釋

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = source.combineLatest(newest, (x, y) => x + y);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete

第一次看到這個 output 應該都會很困惑,我們直接來看 Marble Diagram 吧

source : ----0----1----2|
newest : --0--1--2--3--4--5|
    combineLatest(newest, (x, y) => x + y);
example: ----01--23-4--(56)--7|

首先 combineLatest 可以接收多個 observable,最後一個參數是 callback function,這個 callback function 接收的參數數量跟合併的 observable 數量相同,依照示例來說,因為我們這裡合併了兩個 observable 所以後面的 callback function 就接收 x, y 兩個參數,x 會接收從 source 發送出來的值,y 會接收從 newest 發送出來的值。

最後一個重點就是一定會等兩個 observable 都曾有送值出來才會呼叫我們傳入的 callback,所以這段程式是這樣運行的

  • newest 送出了 0,但此時 source 並沒有送出過任何值,所以不會執行 callback
  • source 送出了 0,此時 newest 最後一次送出的值為 0,把這兩個數傳入 callback 得到 0。
  • newest 送出了 1,此時 source 最後一次送出的值為 0,把這兩個數傳入 callback 得到 1。
  • newest 送出了 2,此時 source 最後一次送出的值為 0,把這兩個數傳入 callback 得到 2。
  • source 送出了 1,此時 newest 最後一次送出的值為 2,把這兩個數傳入 callback 得到 3。
  • newest 送出了 3,此時 source 最後一次送出的值為 1,把這兩個數傳入 callback 得到 4。
  • source 送出了 2,此時 newest 最後一次送出的值為 3,把這兩個數傳入 callback 得到 5。
  • source 結束,但 newest 還沒結束,所以 example 還不會結束。
  • newest 送出了 4,此時 source 最後一次送出的值為 2,把這兩個數傳入 callback 得到 6。
  • newest 送出了 5,此時 source 最後一次送出的值為 2,把這兩個數傳入 callback 得到 7。
  • newest 結束,因為 source 也結束了,所以 example 結束。

不管是 source 還是 newest 送出值來,只要另一方曾有送出過值(有最後的值),就會執行 callback 並送出新的值,這就是 combineLatest。

combineLatest 很常用在運算多個因數的結果,例如最常見的 BMI 計算,我們身高變動時就拿上一次的體重計算新的 BMI,當體重變動時則拿上一次的身高計算 BMI,這就很適合用 combineLatest 來處理!

zip

zip 會取每個 observable 相同順位的元素並傳入 callback,也就是說每個 observable 的第 n 個元素會一起被傳入 callback,這裡我們同樣直接用示例講解會比較清楚

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = source.zip(newest, (x, y) => x + y);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete

Marble Diagram 長這樣

source : ----0----1----2|
newest : --0--1--2--3--4--5|
    zip(newest, (x, y) => x + y)
example: ----0----2----4|

以我們的示例來說,zip 會等到 source 跟 newest 都送出了第一個元素,再傳入 callback,下次則等到 source 跟 newest 都送出了第二個元素再一起傳入 callback,所以運行的步驟如下:

  • newest 送出了第一個值 0,但此時 source 並沒有送出第一個值,所以不會執行 callback。
  • source 送出了第一個值 0,newest 之前送出的第一個值為 0,把這兩個數傳入 callback 得到 0。
  • newest 送出了第二個值 1,但此時 source 並沒有送出第二個值,所以不會執行 callback。
  • newest 送出了第三個值 2,但此時 source 並沒有送出第三個值,所以不會執行 callback。
  • source 送出了第二個值 1,newest 之前送出的第二個值為 1,把這兩個數傳入 callback 得到 2。
  • newest 送出了第四個值 3,但此時 source 並沒有送出第四個值,所以不會執行 callback。
  • source 送出了第三個值 2,newest 之前送出的第三個值為 2,把這兩個數傳入 callback 得到 4。

  • source 結束 example 就直接結束,因為 source 跟 newest 不會再有對應順位的值

zip 會把各個 observable 相同順位送出的值傳入 callback,這很常拿來做 demo 使用,比如我們想要間隔 100ms 送出 'h', 'e', 'l', 'l', 'o',就可以這麼做

var source = Rx.Observable.from('hello');
var source2 = Rx.Observable.interval(100);
var example = source.zip(source2, (x, y) => x);

這裡的 Marble Diagram 就很簡單

source : (hello)|
source2: -0-1-2-3-4-...
    zip(source2, (x, y) => x)
example: -h-e-l-l-o|

這裡我們利用 zip 來達到原本只能同步送出的資料變成了非同步的,很適合用在建立示範用的資料

建議大家平常沒事不要亂用 zip,除非真的需要。因為 zip 必須 cache 住還沒處理的元素,當我們兩個 observable 一個很快一個很慢時,就會 cache 非常多的元素,等待比較慢的那個 observable。這很有可能造成記憶體相關的問題

withLatestFrom

withLatestFrom 運行方式跟 combineLatest 有點像,只是他有主從的關係,只有在主要的 observable 送出新的值時,才會執行 callback,附隨的 observable 只是在背景下運行。讓我們看一個例子

var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);
var example = main.withLatestFrom(some, (x, y) => {
    return y === 1 ? x.toUpperCase() : x;
});
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

先看一下 Marble Diagram

main   : ----h----e----l----l----o|
some   : --0--1--0--0--0--1|
    withLatestFrom(some, (x, y) =>  y === 1 ? x.toUpperCase() : x);
example: ----h----e----l----L----O|

withLatestFrom 會在 main 送出值的時候執行 callback,但請註意如果 main 送出值時 some 之前沒有送出過任何值 callback 仍然不會執行!

這裡我們在 main 送出值時,去判斷 some 最後一次送的值是不是 1 來決定是否要切換大小寫,執行步驟如下

  • main 送出了 h,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 h。
  • main 送出了 e,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 e。
  • main 送出了 l,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 l。
  • main 送出了 l,此時 some 上一次送出的值為 1,把這兩個參數傳入 callback 得到 L。
  • main 送出了 o,此時 some 上一次送出的值為 1,把這兩個參數傳入 callback 得到 O。

withLatestFrom 很常用在一些 checkbox 型的功能,例如說一個編輯器,我們開啟粗體後,打出來的字就都要變粗體,粗體就像是 some observable,而我們打字就是 main observable。

scan

scan 其實就是 Observable 版本的 reduce 只是命名不同。如果熟悉數組操作的話,應該會知道原生的 JS Array 就有 reduce 的方法,使用方式如下

var arr = [1, 2, 3, 4];
var result = arr.reduce((origin, next) => { 
    console.log(origin)
    return origin + next
}, 0);
console.log(result)
// 0
// 1
// 3
// 6
// 10

reduce 方法需要傳兩個參數,第一個是 callback 第二個則是起始狀態,這個 callback 執行時,會傳入兩個參數一個是原本的狀態,第二個是修改原本狀態的參數,最後回傳一個新的狀態,再繼續執行。

所以這段代碼是這樣執行的

  • 第一次執行 callback 起始狀態是 0 所以 origin 傳入 0,next 為 arr 的第一個元素 1,相加之後變成 1 回傳並當作下一次的狀態。
  • 第二次執行 callback,這時原本的狀態(origin)就變成了 1,next 為 arr 的第二個元素 2,相加之後變成 3 回傳並當作下一次的狀態。
  • 第三次執行 callback,這時原本的狀態(origin)就變成了 3,next 為 arr 的第三個元素 3,相加之後變成 6 回傳並當作下一次的狀態。
  • 第三次執行 callback,這時原本的狀態(origin)就變成了 6,next 為 arr 的第四個元素 4,相加之後變成 10 回傳並當作下一次的狀態。
  • 這時 arr 的元素都已經遍歷過了,所以不會直接把 10 回傳。

scan 整體的運行方式都跟 reduce 一樣,示例如下

var source = Rx.Observable.from('hello')
         .zip(Rx.Observable.interval(600), (x, y) => x);
var example = source.scan((origin, next) => origin + next, '');
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// h
// he
// hel
// hell
// hello
// complete

畫成 Marble Diagram

source : ----h----e----l----l----o|
    scan((origin, next) => origin + next, '')
example: ----h----(he)----(hel)----(hell)----(hello)|

這裡可以看到第一次傳入 'h' 跟 '' 相加,返回 'h' 當作下一次的初始狀態,一直重覆下去。

scan 很常用在狀態的計算處理,最簡單的就是對一個數字的加減,我們可以綁定一個 button 的 click 事件,並用 map 把 click event 轉成 1,之後送處 scan 計算值再做顯示。

下麵一個小示例,來示範如何做最簡單的加減

const addButton = document.getElementById('addButton');
const minusButton = document.getElementById('minusButton');
const state = document.getElementById('state');

const addClick = Rx.Observable.fromEvent(addButton, 'click').mapTo(1);
const minusClick = Rx.Observable.fromEvent(minusButton, 'click').mapTo(-1);

const numberState = Rx.Observable.empty()
  .startWith(0)
  .merge(addClick, minusClick)
  .scan((origin, next) => origin + next, 0)

numberState
  .subscribe({
    next: (value) => { state.innerHTML = value;},
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這裡我們用了兩個 button,一個是 add 按鈕,一個是 minus 按鈕。

我把這兩個按鈕的點擊事件各建立了 addClcik, minusClick 兩個 observable,這兩個 observable 直接 mapTo(1) 跟 mapTo(-1),代表被點擊後會各自送出的數字!

接著我們用了 empty() 建立一個空的 observable 代表畫面上數字的狀態,搭配 startWith(0) 來設定初始值,接著用 merge 把兩個 observable 合併透過 scan 處理之後的邏輯,最後在 subscribe 來更改畫面的顯示。

buffer

buffer 是一整個家族,總共有五個相關的 operators

  • buffer
  • bufferCount
  • bufferTime
  • bufferToggle
  • bufferWhen

這裡比較常用到的是 buffer, bufferCount 跟 bufferTime 這三個,我們直接來看示例。

var source = Rx.Observable.interval(300);
var source2 = Rx.Observable.interval(1000);
var example = source.buffer(source2);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...

畫成 Marble Diagram 則像是

source : --0--1--2--3--4--5--6--7..
source2: ---------0---------1--------...
            buffer(source2)
example: ---------([0,1,2])---------([3,4,5])

buffer 要傳入一個 observable(source2),它會把原本的 observable (source)送出的元素緩存在數組中,等到傳入的 observable(source2) 送出元素時,就會觸發把緩存的元素送出。

這裡的示例 source2 是每一秒就會送出一個元素,我們可以改用 bufferTime 簡潔的表達,如下

var source = Rx.Observable.interval(300);
var example = source.bufferTime(1000);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...

除了用時間來作緩存外,我們更常用數量來做緩存,示例如下

var source = Rx.Observable.interval(300);
var example = source.bufferCount(3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...

在示例上,我們可以用 buffer 來做某個事件的過濾,例如像是滑鼠連點才能真的執行,這裡我們一樣寫了一個小示例

const button = document.getElementById('demo');
const click = Rx.Observable.fromEvent(button, 'click')
const example = click
            .bufferTime(500)
            .filter(arr => arr.length >= 2);

example.subscribe({
    next: (value) => { console.log('success'); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這裡我們只有在 500 毫秒內連點兩下,才能成功印出 'success',這個功能在某些特殊的需求中非常的好用,也能用在批次處理來降低 request 傳送的次數

delay

delay 可以延遲 observable 一開始發送元素的時間點,示例如下

var source = Rx.Observable.interval(300).take(5);
var example = source.delay(500);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4

當然直接從 log 出的訊息看,是完全看不出差異的

讓我們直接看 Marble Diagram

source : --0--1--2--3--4|
        delay(500)
example: -------0--1--2--3--4|

從 Marble Diagram 可以看得出來,第一次送出元素的時間變慢了,雖然在這裡看起來沒什麼用,但是在 UI 操作上是非常有用的,這個部分我們最後示範。

delay 除了可以傳入毫秒以外,也可以傳入 Date 型別的資料,如下使用方式

var source = Rx.Observable.interval(300).take(5);
var example = source.delay(new Date(new Date().getTime() + 1000));
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

delayWhen

delayWhen 的作用跟 delay 很像,最大的差別是 delayWhen 可以影響每個元素,而且需要傳一個 callback 並回傳一個 observable,示例如下

var source = Rx.Observable.interval(300).take(5);
var example = source
              .delayWhen(
                  x => Rx.Observable.empty().delay(100 * x * x)
              );
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這時我們的 Marble Diagram 如下

source : --0--1--2--3--4|
    .delayWhen(x => Rx.Observable.empty().delay(100 * x * x));
example: --0---1----2-----3-----4|

這裡傳進來的 x 就是 source 送出的每個元素,這樣我們就能對每一個做延遲。

這裡我們用 delay 來做一個小功能,這個功能很簡單就是讓多張照片跟著滑鼠跑,但每張照片不能跑一樣快!

首先我們準備六張大頭照,並且寫進 HTML

<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover6.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover5.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover4.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover3.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover2.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover1.jpg" alt="">

用 CSS 把 img 改成圓形,並加上邊筐以及絕對位置

img{
  position: absolute;
  border-radius: 50%;
  border: 3px white solid;
  transform: translate3d(0,0,0);
}

再來寫 JS,一樣第一步先抓 DOM

var imgList = document.getElementsByTagName('img');

第二步建立 observable

var movePos = Rx.Observable.fromEvent(document, 'mousemove')
.map(e => ({ x: e.clientX, y: e.clientY }))

第三步撰寫邏輯

function followMouse(DOMArr) {
  const delayTime = 600;
  DOMArr.forEach((item, index) => {
    movePos
      .delay(delayTime * (Math.pow(0.65, index) + Math.cos(index / 4)) / 2)
      .subscribe(function (pos){
        item.style.transform = 'translate3d(' + pos.x + 'px, ' + pos.y + 'px, 0)';
      });
  });
}
followMouse(Array.from(imgList))

這裡我們把 imgList 從 Collection 轉成 Array 後傳入 followMouse(),並用 forEach 把每個 omg 取出並利用 index 來達到不同的 delay 時間,這個 delay 時間的邏輯大家可以自己想,最後 subscribe 就完成啦!

完整示例效果:
https://jsbin.com/hayixa/2/edit?html,css,js,output

debounce

跟 buffer、bufferTime 一樣,Rx 有 debounce 跟 debounceTime 一個是傳入 observable 另一個則是傳入毫秒,比較常用到的是 debounceTime,這裡我們直接來看一個示例

var source = Rx.Observable.interval(300).take(5);
var example = source.debounceTime(1000);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 4
// complete

這裡只印出 4 然後就結束了,因為 debounce 運行的方式是每次收到元素,他會先把元素 cache 住並等待一段時間,如果這段時間內已經沒有收到任何元素,則把元素送出;如果這段時間內又收到新的元素,則會把原本 cache 住的元素釋放掉並重新計時,不斷反覆。

以現在這個示例來講,我們每 300 毫秒就會送出一個數值,但我們的 debounceTime 是 1000 毫秒,也就是說每次 debounce 收到元素還等不到 1000 毫秒,就會收到下一個新元素,然後重新等待 1000 毫秒,如此重覆直到第五個元素送出時,observable 結束(complete)了,debounce 就直接送出元素。

以 Marble Diagram 表示如下

source : --0--1--2--3--4|
        debounceTime(1000)
example: --------------4|

debounce 會在收到元素後等待一段時間,這很適合用來處理間歇行為,間歇行為就是指這個行為是一段一段的,例如要做 Auto Complete 時,我們要打字搜尋不會一直不斷的打字,可以等我們停了一小段時間後再送出,才不會每打一個字就送一次 request!

const searchInput = document.getElementById('searchInput');
const theRequestValue = document.getElementById('theRequestValue');
Rx.Observable.fromEvent(searchInput, 'input')
  .debounceTime(300)
  .map(e => e.target.value)
  .subscribe((value) => {
    theRequestValue.textContent = value;
    // 在這裡發 request
  })

throttle

基本上每次看到 debounce 就會看到 throttle,他們兩個的作用都是要降低事件的觸發頻率,但行為上有很大的不同。

跟 debounce 一樣 RxJS 有 throttle 跟 throttleTime 兩個方法,一個是傳入 observable 另一個是傳入毫秒,比較常用到的也是 throttleTime,讓我們直接來看示例

var source = Rx.Observable.interval(300).take(5);
var example = source.throttleTime(1000);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 4
// complete

跟 debounce 的不同是 throttle 會先開放送出元素,等到有元素被送出就會沉默一段時間,等到時間過了又會開放發送元素。

throttle 比較像是控制行為的最高頻率,也就是說如果我們設定 1000 毫秒,那該事件頻率的最大值就是每秒觸發一次不會再更快,debounce 則比較像是必須等待的時間,要等到一定的時間過了才會收到元素。

throttle 更適合用在連續性行為,比如說 UI 動畫的運算過程,因為 UI 動畫是連續的,像我們之前在做拖拉時,就可以加上 throttleTime(12) 讓 mousemove event 不要發送的太快,避免畫面更新的速度跟不上樣式的切換速度。

distinct

它能幫我們把相同值的資料濾掉只留一筆,讓我們直接來看示例

var source = Rx.Observable.from(['a', 'b', 'c', 'a', 'b'])
            .zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinct()

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// a
// b
// c
// complete

如果用 Marble Diagram 表示如下

source : --a--b--c--a--b|
            distinct()
example: --a--b--c------|

從上面的示例可以看得出來,當我們用 distinct 後,只要有重覆出現的值就會被過濾掉。

另外我們可以傳入一個 selector callback function,這個 callback function 會傳入一個接收到的元素,並回傳我們真正希望比對的值,舉例如下

var source = Rx.Observable.from([{ value: 'a'}, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }])
            .zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinct((x) => {
    return x.value
});

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// {value: "a"}
// {value: "b"}
// {value: "c"}
// complete

這裡可以看到,因為 source 送出的都是實例,而 js 事件的比對是比對記憶體位置,所以在這個例子中這些實例永遠不會相等,但實際上我們想比對的是實例中的 value,這時我們就可以傳入 selector callback,來選擇我們要比對的值。

distinct 傳入的 callback 在 RxJS 5 幾個 bate 版本中有過很多改變,現在網路上很多文章跟教學都是過時的,請讀者務必小心!

實際上 distinct() 會在背地裡建立一個 Set,當接收到元素時會先去判斷 Set 內是否有相同的值,如果有就不送出,如果沒有則存到 Set 並送出。所以記得儘量不要直接把 distinct 用在一個無限的 observable 里,這樣很可能會讓 Set 越來越大,建議大家可以放第二個參數 flushes,或用 distinctUntilChanged

這裡指的 Set 其實是 RxJS 自己實現的,跟 ES6 原生的 Set 行為也都一致,只是因為 ES6 的 Set 支持程度還並不理想,所以這裡是直接用 JS 實現。

distinct 可以傳入第二個參數 flushes observable 用來清除暫存的資料,示例如下

var source = Rx.Observable.from(['a', 'b', 'c', 'a', 'c'])
            .zip(Rx.Observable.interval(300), (x, y) => x);
var flushes = Rx.Observable.interval(1300);
var example = source.distinct(null, flushes);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// a
// b
// c
// c
// complete

這裡我們用 Marble Diagram 比較好表示

source : --a--b--c--a--c|
flushes: ------------0---...
        distinct(null, flushes);
example: --a--b--c-----c|

其實 flushes observable 就是在送出元素時,會把 distinct 的暫存清空,所以之後的暫存就會從頭來過,這樣就不用擔心暫存的 Set 越來愈大的問題,但其實我們平常不太會用這樣的方式來處理,通常會用另一個方法 distinctUntilChanged。

distinctUntilChanged

distinctUntilChanged 跟 distinct 一樣會把相同的元素過濾掉,但 distinctUntilChanged 只會跟最後一次送出的元素比較,不會每個都比,舉例如下

var source = Rx.Observable.from(['a', 'b', 'c', 'c', 'b'])
            .zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinctUntilChanged()

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// a
// b
// c
// b
// complete

這裡 distinctUntilChanged 只會暫存一個元素,併在收到元素時跟暫存的元素比對,如果一樣就不送出,如果不一樣就把暫存的元素換成剛接收到的新元素並送出。

source : --a--b--c--c--b|
            distinctUntilChanged()
example: --a--b--c-----b|

從 Marble Diagram 中可以看到,第二個 c 送出時剛好上一個就是 c 所以就被濾掉了,但最後一個 b 則跟上一個不同所以沒被濾掉。

distinctUntilChanged 是比較常在開發中上使用的,最常見的狀況是我們在做多方同步時。當我們有多個 Client,且每個 Client 有著各自的狀態,Server 會再一個 Client 需要變動時通知所有 Client 更新,但可能某些 Client 接收到新的狀態其實跟上一次收到的是相同的,這時我們就可用 distinctUntilChanged 方法只處理跟最後一次不相同的訊息,像是多方通話、多裝置的資訊同步都會有類似的情境。

catch

catch 是很常見的非同步錯誤處理方法,在 RxJS 中也能夠直接用 catch 來處理錯誤,在 RxJS 中的 catch 可以回傳一個 observable 來送出新的值,讓我們直接來看示例:

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .catch(error => Rx.Observable.of('h'));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這個示例我們每隔 500 毫秒會送出一個字串(String),並用字串的方法 toUpperCase() 來把字串的英文字母改成大寫,過程中可能未知的原因送出了一個數值(Number) 2 導致發生例外(數值沒有 toUpperCase 的方法),這時我們在後面接的 catch 就能抓到錯誤。

catch 可以回傳一個新的 Observable、Promise、Array 或任何 Iterable 的事件,來傳送之後的元素。

以我們的例子來說最後就會在送出 X 就結束,畫成 Marble Diagram 如下

source : ----a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
        catch(error => Rx.Observable.of('h'))
example: ----a----b----c----d----h|

這裡可以看到,當錯誤發生後就會進到 catch 並重新處理一個新的 observable,我們可以利用這個新的 observable 來送出我們想送的值。

也可以在遇到錯誤後,讓 observable 結束,如下

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .catch(error => Rx.Observable.empty());

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

回傳一個 empty 的 observable 來直接結束(complete)。

另外 catch 的 callback 能接收第二個參數,這個參數會接收當前的 observalbe,我們可以回傳當前的 observable 來做到重新執行,示例如下

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .catch((error, obs) => obs);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這裡可以看到我們直接回傳了當前的 obserable(其實就是 example)來重新執行,畫成 Marble Diagram 如下

source : ----a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
        catch((error, obs) => obs)
example: ----a----b----c----d--------a----b----c----d--..

因為是我們只是簡單的示範,所以這裡會一直無限迴圈,實務上通常會用在斷線重連的情境。

另上面的處理方式有一個簡化的寫法,叫做 retry()。

retry

如果我們想要一個 observable 發生錯誤時,重新嘗試就可以用 retry 這個方法,跟我們前一個講示例的行為是一致

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retry();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

通常這種無限的 retry 會放在即時同步的重新連接,讓我們在連線斷掉後,不斷的嘗試。另外我們也可以設定只嘗試幾次,如下

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retry(1);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
}); 
// a
// b
// c
// d
// a
// b
// c
// d
// Error: TypeError: x.toUpperCase is not a function

這裡我們對 retry 傳入一個數值 1,能夠讓我們只重覆嘗試 1 次後送出錯誤,畫成 Marble Diagram 如下

source : ----a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
                retry(1)
example: ----a----b----c----d--------a----b----c----d----X|

這種處理方式很適合用在 HTTP request 失敗的場景中,我們可以設定重新發送幾次後,再秀出錯誤訊息。

retryWhen

RxJS 還提供了另一種方法 retryWhen,他可以把例外發生的元素放到一個 observable 中,讓我們可以直接操作這個 observable,並等到這個 observable 操作完後再重新訂閱一次原本的 observable。

這裡我們直接來看代碼

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retryWhen(errorObs => errorObs.delay(1000));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這裡 retryWhen 我們傳入一個 callback,這個 callback 有一個參數會傳入一個 observable,這個 observable 不是原本的 observable(example) 而是例外事件送出的錯誤所組成的一個 observable,我們可以對這個由錯誤所組成的 observable 做操作,等到這次的處理完成後就會重新訂閱我們原本的 observable。

這個示例我們是把錯誤的 observable 送出錯誤延遲 1 秒,這會使後面重新訂閱的動作延遲 1 秒才執行,畫成 Marble Diagram 如下

source : ----a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
        retryWhen(errorObs => errorObs.delay(1000))
example: ----a----b----c----d-------------------a----b----c----d----...

從上圖可以看到後續重新訂閱的行為就被延後了,但實務上我們不太會用 retryWhen 來做重新訂閱的延遲,通常是直接用 catch 做到這件事。這裡只是為了示範 retryWhen 的行為,實務上我們通常會把 retryWhen 拿來做錯誤通知或是例外收集,如下

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retryWhen(
                errorObs => errorObs.map(err => fetch('...')));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這裡的 errorObs.map(err => fetch('...')) 可以把 errorObs 里的每個錯誤變成 API 的發送,通常這裡個 API 會像是送訊息到公司的通訊頻道(Slack 等等),這樣可以讓工程師馬上知道可能哪個 API 掛了,這樣我們就能即時地處理。

retryWhen 實際上是在背地裡建立一個 Subject 並把錯誤放入,會在對這個 Subject 進行內部的訂閱,另外記得這個 observalbe 預設是無限的,如果我們把它結束,原本的 observable 也會跟著結束。

repeat

我們有時候可能會想要 retry 一直重覆訂閱的效果,但沒有錯誤發生,這時就可以用 repeat 來做到這件事,示例如下

var source = Rx.Observable.from(['a','b','c'])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source.repeat(1);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

// a
// b
// c
// a
// b
// c
// complete

這裡 repeat 的行為跟 retry 基本一致,只是 retry 只有在例外發生時才觸發,畫成 Marble Diagram 如下

source : ----a----b----c|
            repeat(1)
example: ----a----b----c----a----b----c|

同樣的我們可以不給參數讓他無限迴圈,如下

var source = Rx.Observable.from(['a','b','c'])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source.repeat();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這樣我們就可以做動不斷重覆的行為,這個可以在建立輪詢時使用,讓我們不斷地發 request 來更新畫面。

最後我們來看一個錯誤處理在實際應用中的小示例

const title = document.getElementById('title');

var source = Rx.Observable.from(['a','b','c','d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x)
            .map(x => x.toUpperCase()); 
            // 通常 source 會是建立即時同步的連線,像是 web socket

var example = source.catch(
                (error, obs) => Rx.Observable.empty()
                               .startWith('連線發生錯誤: 5秒後重連')
                               .concat(obs.delay(5000))
                 );

example.subscribe({
    next: (value) => { title.innerText = value },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這個示例其實就是模仿在即時同步斷線時,利用 catch 返回一個新的 observable,這個 observable 會先送出錯誤訊息並且把原本的 observable 延遲 5 秒再做合併,雖然這隻是一個模仿,但它清楚的展示了 RxJS 在做錯誤處理時的靈活性。

concatAll

concatAll 最重要的重點就是他會處理完前一個 observable 才會在處理下一個 observable,讓我們來看一個示例

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.concatAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// (點擊後)
// 0
// 1
// 2
// 3
// 4
// 5 ...

上面這段代碼,當我們點擊畫面時就會開始送出數值,如果用 Marble Diagram 表示如下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     concatAll()
example: ----------------0----1----2----3----4--..

從 Marble Diagram 可以看得出來,當我們點擊一下 click 事件會被轉成一個 observable 而這個 observable 會每一秒送出一個遞增的數值,當我們用 concatAll 之後會把二維的 observable 攤平成一維的 observable,但 concatAll 會一個一個處理,一定是等前一個 observable 完成(complete)才會處理下一個 observable,因為現在送出 observable 是無限的永遠不會完成(complete),就導致他永遠不會處理第二個送出的 observable!

我們再看一個例子

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));

var example = source.concatAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

現在我們把送出的 observable 限制只取前三個元素,用 Marble Diagram 表示如下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \
                    \ ----0----1----2|   ----0----1----2|
                     ----0----1----2|
                     concatAll()
example: ----------------0----1----2----0----1----2--..

這裡我們把送出的 observable 變成有限的,只會送出三個元素,這時就能看得出來 concatAll 不管兩個 observable 送出的時間多麼相近,一定會先處理前一個 observable 再處理下一個。

switch

switch 同樣能把二維的 observable 攤平成一維的,但他們在行為上有很大的不同,我們來看下麵這個示例

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.switch();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

用 Marble Diagram 表示如下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \----0----1--...
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     switch()
example: -----------------0----1----2--------0----1--...

switch 最重要的就是他會在新的 observable 送出後直接處理新的 observable 不管前一個 observable 是否完成,每當有新的 observable 送出就會直接把舊的 observable 退訂(unsubscribe),永遠只處理最新的 observable!

所以在這上面的 Marble Diagram 可以看得出來第一次送出的 observable 跟第二次送出的 observable 時間點太相近,導致第一個 observable 還來不及送出元素就直接被退訂了,當下一次送出 observable 就又會把前一次的 observable 退訂。

mergeAll

我們之前講過 merge 他可以讓多個 observable 同時送出元素,mergeAll 也是同樣的道理,它會把二維的 observable 轉成一維的,並且能夠同時處理所有的 observable,讓我們來看這個示例

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.mergeAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

上面這段代碼用 Marble Diagram 表示如下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \----0----1--...
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     switch()
example: ----------------00---11---22---33---(04)4--...

從 Marble Diagram 可以看出來,所有的 observable 是並行(Parallel)處理的,也就是說 mergeAll 不會像 switch 一樣退訂(unsubscribe)原先的 observable 而是並行處理多個 observable。以我們的示例來說,當我們點擊越多下,最後送出的頻率就會越快。

另外 mergeAll 可以傳入一個數值,這個數值代表他可以同時處理的 observable 數量,我們來看一個例子

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));

var example = source.mergeAll(2);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這裡我們送出的 observable 改成取前三個,並且讓 mergeAll 最多只能同時處理 2 個 observable,用 Marble Diagram 表示如下

click  : ---------c-c----------o----------.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o----------c----------..
                   \ \          \----0----1----2|     
                    \ ----0----1----2|  
                     ----0----1----2|
                     mergeAll(2)
example: ----------------00---11---22---0----1----2--..

當 mergeAll 傳入參數後,就會等處理中的其中一個 observable 完成,再去處理下一個。以我們的例子來說,前面兩個 observabel 可以被並行處理,但第三個 observable 必須等到第一個 observable 結束後,才會開始。

我們可以利用這個參數來決定要同時處理幾個 observable,如果我們傳入 1 其行為就會跟 concatAll 是一模一樣的,這點在原始碼可以看到他們是完全相同的。

concatMap

concatMap 其實就是 map 加上 concatAll 的簡化寫法,我們直接來看一個示例

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .map(e => Rx.Observable.interval(1000).take(3))
                .concatAll();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

上面這個示例就可以簡化成

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .concatMap(
                    e => Rx.Observable.interval(100).take(3)
                );

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

前後兩個行為是一致的,記得 concatMap 也會先處理前一個送出的 observable 在處理下一個 observable,畫成 Marble Diagram 如下

source : -----------c--c------------------...
        concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-1-2-0-1-2---------...

這樣的行為也很常被用在發送 request 如下

function getPostData() {
    return fetch('https://jsonplaceholder.typicode.com/posts/1')
    .then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.concatMap(
                    e => Rx.Observable.from(getPostData()));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這裡我們每點擊一下畫面就會送出一個 HTTP request,如果我們快速的連續點擊,大家可以在開發者工具的 network 看到每個 request 是等到前一個 request 完成才會送出下一個 request

concatMap 還有第二個參數是一個 selector callback,這個 callback 會傳入四個參數,分別是

  1. 外部 observable 送出的元素
  2. 內部 observable 送出的元素
  3. 外部 observable 送出元素的 index
  4. 內部 observable 送出元素的 index

回傳值我們想要的值,示例如下

function getPostData() {
    return fetch('https://jsonplaceholder.typicode.com/posts/1')
    .then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.concatMap(
                e => Rx.Observable.from(getPostData()), 
                (e, res, eIndex, resIndex) => res.title);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這個示例的外部 observable 送出的元素就是 click event 實例,內部 observable 送出的元素就是 response 實例,這裡我們回傳 response 實例的 title 屬性,這樣一來我們就可以直接收到 title,這個方法很適合用在 response 要選取的值跟前一個事件或順位(index)相關時。

switchMap

switchMap 其實就是 map 加上 switch 簡化的寫法,如下

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .map(e => Rx.Observable.interval(1000).take(3))
                .switch();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

上面的代碼可以簡化成

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .switchMap(
                    e => Rx.Observable.interval(100).take(3)
                );

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

畫成 Marble Diagram 表示如下

source : -----------c--c-----------------...
        concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0--0-1-2-----------...

只要註意一個重點 switchMap 會在下一個 observable 被送出後直接退訂前一個未處理完的 observable

另外我們也可以把 switchMap 用在發送 HTTP request

function getPostData() {
    return fetch('https://jsonplaceholder.typicode.com/posts/1')
    .then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.switchMap(
                    e => Rx.Observable.from(getPostData()));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

如果我們快速的連續點擊五下,可以在開發者工具的 network 看到每個 request 會在點擊時發送

雖然我們發送了多個 request 但最後真正印出來的 log 只會有一個,代表前面發送的 request 已經不會造成任何的 side-effect 了,這個很適合用在只看最後一次 request 的情境,比如說 自動完成(auto complete),我們只需要顯示使用者最後一次打在畫面上的文字,來做建議選項而不用每一次的。

switchMap 跟 concatMap 一樣有第二個參數 selector callback 可用來回傳我們要的值,這部分的行為跟 concatMap 是一樣的,這裡就不再贅述。

mergeMap

mergeMap 其實就是 map 加上 mergeAll 簡化的寫法,如下

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .map(e => Rx.Observable.interval(1000).take(3))
                .mergeAll();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

上面的代碼可以簡化成

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .mergeMap(
                    e => Rx.Observable.interval(100).take(3)
                );

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

畫成 Marble Diagram 表示

source : -----------c-c------------------...
        concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-(10)-(21)-2----------...

記得 mergeMap 可以並行處理多個 observable,以這個例子來說當我們快速點按兩下,元素髮送的時間點是有機會重疊的

另外我們也可以把 switchMap 用在發送 HTTP request

function getPostData() {
    return fetch('https://jsonplaceholder.typicode.com/posts/1')
    .then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.mergeMap(
                    e => Rx.Observable.from(getPostData()));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

如果我們快速的連續點擊五下,大家可以在開發者工具的 network 看到每個 request 會在點擊時發送並且會 log 出五個實例

mergeMap 也能傳入第二個參數 selector callback,這個 selector callback 跟 concatMap 第二個參數也是完全一樣的,但 mergeMap 的重點是我們可以傳入第三個參數,來限制並行處理的數量

function getPostData() {
    return fetch('https://jsonplaceholder.typicode.com/posts/1')
    .then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.mergeMap(
                e => Rx.Observable.from(getPostData()), 
                (e, res, eIndex, resIndex) => res.title, 3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這裡我們傳入 3 就能限制,HTTP request 最多只能同時送出 3 個,並且要等其中一個完成在處理下一個

連續點按了五下,但第四個 request 是在第一個完成後才送出的,這個很適合用在特殊的需求下,可以限制同時發送的 request 數量。

RxJS 5 還保留了 mergeMap 的別名叫 flatMap,雖然官方文件上沒有,但這兩個方法是完全一樣的。請參考https://github.com/ReactiveX/RxJS/issues/333

switchMap, mergeMap, concatMap

這三個 operators 還有一個共同的特性,那就是這三個 operators 可以把第一個參數所回傳的 promise 實例直接轉成 observable,這樣我們就不用再用 Rx.Observable.from 轉一次,如下

function getPersonData() {
    return fetch('https://jsonplaceholder.typicode.com/posts/1')
    .then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.concatMap(e => getPersonData());
                                    //直接回傳 promise 實例

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

至於在使用上要如何選擇這三個 operators? 其實都還是看使用情境而定,這裡筆者簡單列一下大部分的使用情境

  • concatMap 用在可以確定內部的 observable
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、kindedit編輯器 就是上面這樣的編輯輸入文本的一個編輯器 這也是一個插件。那麼怎麼用呢? 1、下載:百度kindedit 2、引入: 3、看官方提供的文檔 在addarticle.html中 當你把編輯器插入好的時候,你看似都可以設置大小,字體變粗等。。但是當你上傳圖片的時候就會想下麵一樣 ...
  • [toc] 功能和特性 基於socket實現的c/s架構的的通信 伺服器和客戶心跳連接 gson實現的消息通信機制 註冊及登錄 支持私聊和群聊。 動態更新用戶列表以及用戶消息提示 支持emoji表情,以及emoji表情選擇器 伺服器端資料庫用戶記錄 ~~實現文件傳輸~~ ~~文件記錄~~ 功能展示 ...
  • 寫什麼?為什麼寫?寫給誰看?這個是寫博客的首要問題。 寫什麼? 在這個分類下的文章主要是講一些博主對於JDK9的源碼理解結合一些入門級的數據結構與演算法。引用和知識來源主要來源於 CLRS(演算法導論),Alogrithms( 演算法 普林斯頓大學教材 鏈接:https://algs4.cs.prince ...
  • 文章目錄 ★引子 ★求導 ★最初的想法 ★初步的想法 ★後來的想法 ★最後的想法 ★編程範式 ★結尾 首先聲明一點,本文主要介紹的是面向對象(OO)的思想,順便談下函數式編程,而不是教你如何準確地、科學地用java求出函數在一點的導數。 ★引子 首先,直接上一段python代碼,請大家先分析下上面代 ...
  • 熔斷與降級 為什麼在RPC環節中有熔斷以及降級的需求,詳細的原因這裡不多解釋,從網上搜索一張圖做示意。 熔斷 我理解熔段主要解決如下幾個問題: 當所依賴的對象不穩定時,能夠起到快速失敗的目的 快速失敗後,能夠根據一定的演算法動態試探所依賴對象是否恢復 比如產品詳細頁獲取產品的好評總數時,由於後端服務異 ...
  • 你可能會疑惑為什麼我們使用6位數來表示一種顏色而不是只用一位或二位,答案是使用6位數可提供給我們巨大數量的顏色變化。 會有多少種可能的顏色?16 個值(0~F)和 6 個位置意味著我們有 16 的 6 次方,或者說超過 1600 萬種可能的顏色。 Hex code 遵循 red-green-blue ...
  • 很多情況下,你會使用 CSS 庫,這些庫可能會意外覆蓋掉你自己的 CSS。所以當你需要確保某元素具有指定的 CSS 時,你可以使用 !important。 Hello World!是粉色的 ...
  • 1、js的簡介 (1)js是什麼? js是可以嵌入到html中,是基於對象和事件驅動的腳本語言。 特點: 交互性 安全性:js不能訪問本地磁碟 跨平臺:瀏覽器中都具備js解析器 (2)js能做什麼 js能動態的修改(增刪)html和css的代碼 能動態的校驗數據 (3)js的歷史及組成 BOM(瀏覽 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...