RxJS v6 學習指南

来源:https://www.cnblogs.com/ang-/archive/2018/08/21/9514430.html
-Advertisement-
Play Games

為什麼要使用 RxJS RxJS 是一套處理非同步編程的 API,那麼我將從非同步講起。 前端編程中的非同步有:事件(event)、AJAX、動畫(animation)、定時器(timer)。 非同步常見的問題 回調地獄(Callback Hell) 競態條件(Race Condition) 記憶體泄漏(Me ...


為什麼要使用 RxJS

RxJS 是一套處理非同步編程的 API,那麼我將從非同步講起。

前端編程中的非同步有:事件(event)、AJAX、動畫(animation)、定時器(timer)。

非同步常見的問題

  • 回調地獄(Callback Hell)
  • 競態條件(Race Condition)
  • 記憶體泄漏(Memory Leak)
  • 管理複雜狀態(Manage Complex States)
  • 錯誤處理(Exception Handling)

回調地獄就是指層層嵌套的回調函數,造成代碼難以理解,並且難以協調組織複雜的操作。

競態條件出現的原因是無法保證非同步操作的完成會和他們開始時的順序一樣,因此最終結果不可控。比如常見的 AutoComplete 效果,每次輸入後向後端發送請求獲取結果展示在搜索框下麵,由於網路、後端數據查詢等原因有可能出現最後發送的請求比之前的請求更快地完成了,這時最終展現的並不是最後那個請求的結果,而這並不是我們所希望的。

這裡說的記憶體泄漏指的是單頁應用切換頁面時由於忘記在合適的時機移除監聽事件造成的記憶體泄漏。

非同步帶來了狀態的改變,可能會使狀態管理變得非常複雜,尤其是某個狀態有多個來源時,比如有些應用,一開始有一個預設值,再通過 AJAX 獲取初始狀態,存儲在 localStorage,之後通過 WebSocket 獲取更新。這時查詢狀態可能是同步或者非同步的,狀態的變更可能是主動獲取也可能是被動推送的,如果還有各種排序、篩選,狀態管理將會更加複雜。

JavaScript 中的 try/catch 只能捕獲同步的錯誤,非同步的錯誤不易處理。

Promise

使用 Promise 可以減輕一些非同步問題,如將回調函數變為串列的鏈式調用,統一同步和非同步代碼等,async/await 中也可以使用 try/catch 來捕獲錯誤。但是對於複雜的場景,仍然難於處理。而且 Promise 還有其他的問題,一是只有一個結果,二是不可以取消。

非同步 API:

非同步編程時不僅要面對這些問題,還有下麵這些使用方式各異的 API:

  • DOM Events
  • XMLHttpRequest
  • fetch
  • WebSocket
  • Service Worker
  • setTimeout
  • setInterval
  • requestAnimationFrame

而如果使用 RxJS,可以用統一的 API 來進行處理,而且藉助 RxJS 各種強大的操作符,我們可以更簡單地實現我們的需求。

認識 RxJS

什麼是 RxJS

我們都知道 JS 是什麼,那麼什麼是 Rx 呢?Rx 是 Reactive Extension(也叫 ReactiveX)的簡稱,指的是實踐響應式編程的一套工具,Rx 官網首頁的介紹是一套通過可監聽流來做非同步編程的 API(An API for asynchronous programming with observable streams)。

Rx 最早是由微軟開發的 LinQ 擴展出來的開源項目,之後由開源社區維護,有多種語言的實現,如 Java 的 RxJava,Python 的 RxPY 等,而 RxJS 就是 Rx 的 JavaScript 語言實現。

RxJS 的兩種編程思想

RxJS 引入了兩種重要的編程思想:函數式編程和響應式編程。

函數式編程(Functional Programming,簡稱 FP)是一種編程範式,強調使用函數來思考問題、編寫代碼。

In computer science, functional programming is a programming paradigm—a style of building the structure and elements of computer programs—that treats computation as the evaluation of mathematical functions and avoids changing-state and mutable data.

函數式編程的主要設計點在於避免使用狀態和可變的數據,即 stateless and immutable。

函數式編程對函數的使用有一些特殊要求:

  • 聲明式(Declarative)
  • 純函數(Pure Function)
  • 數據不可變性(Immutability)

聲明式的函數,讓開發者只需要表達”想要做什麼”,而不需要表達“怎麼去做”。

純函數指的是執行結果由輸入參數決定,參數相同時結果相同,不受其他數據影響,並且不會帶來副作用(Side Effect)的函數。副作用指的是函數做了和本身運算返回值沒有關係的事情,如修改外部變數或傳入的參數對象,甚至是執行 console.log 都算是 Side Effect。前端中常見的副作用有發送 http 請求、操作 DOM、調用 alert 或者 confirm 函數等。滿足純函數的特性也叫做引用透明度(Referential Transparency)。

數據不可變就是指這個數據一旦產生,它的值就永遠不會變。JavaScript 中字元串類型和數字類型就是不可改變的,而對象基本都是可變的,可能會帶來各種副作用。現在有各種庫可以實現 Immutable 特性,如 immutable.jsimmer.js

中文維基上說響應式編程(Reactive Programming)是一種面向數據流(stream)和變化傳播的編程範式。個人的理解是對數據流進行編程的一種編程範式,使用各種函數創建、組合、過濾數據流,然後通過監聽這個數據流來響應它的變化。響應式編程抽象出了流這個概念,提高了代碼的抽象級別,我們不用去關心大量的實現細節,而專註於對數據流的操作。

響應式流可以認為是隨著時間發出的一系列元素。響應式和觀察者模式有點相似,訂閱者訂閱後,發佈者吐出數據時,訂閱者會響應式進行處理。實際上Rx 組合了觀察者模式(Observer pattern )、迭代器模式(Iterator pattern)和函數式編程。

RxJS 是上面兩種編程思想的結合,但是對於它是不是函數響應式編程(FRP)有比較大的爭議,因為它雖然既是函數式又是響應式但是不符合早期 FRP 的定義。

RxJS 的特點

  • 數據流抽象了很多現實問題
  • 擅長處理非同步問題
  • 把複雜問題分解為簡單問題的組合

前端中的 DOM 事件、WebSocket 推送消息、AJAX 請求資源、動畫都可以看作是數據流。

RxJS 對數據採用“推”的方式,當一個數據產生時,會將其推送給對應的處理函數,這個處理函數不用關心數據時同步產生還是非同步產生的,因此處理非同步將會變得非常簡單。

RxJS 中很多操作符,每個操作符都提供了一個小功能,學習 RxJS 最重要的就是學習如何組合操作符來解決複雜問題。

RxJS 入門

RxJS 使用

RxJS 倉庫現在移到了 ReactiveX 組織下,最新的大版本為 6,與之前的版本相比有許多破壞性變更,請註意。

RxJS 的 import 路徑有以下 5 種:

  1. 創建 Observable 的方法、types、schedulers 和一些工具方法

    import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs';

  2. 操作符 operators

    import { map, filter, scan } from 'rxjs/operators';

  3. webSocket

    import { webSocket } from 'rxjs/webSocket';

  4. ajax

    import { ajax } from 'rxjs/ajax';

  5. 測試

    import { TestScheduler } from 'rxjs/testing';

本文所有 demo 均在 v6.2.1 中測試過

一個簡單的例子

import { fromEvent } from 'rxjs';
import { take } from 'rxjs/operators';

const eleBtn = document.querySelector('#btn')
const click$ = fromEvent(eleBtn, 'click')

click$.pipe(take(1))
  .subscribe(e => {
    console.log('只可點擊一次')
    eleBtn.setAttribute('disabled', '')
  })

這裡演示了 RxJS 的大概用法,通過 fromEvent 將點擊事件轉換為 RxJS 的 Observable (響應式數據流),take(1) 表示只操作一次,觀察者通過訂閱(subscribe)來響應變化。具體 API 的使用會在後面講到。

演示地址

代表流的變數用 $ 符號結尾,是 RxJS 中的一種慣例。

RxJS 要點

RxJS 有一個核心和三個重點,一個核心是 Observable 再加上相關的 Operators,三個重點分別是 Observer、Subject、Schedulers。

什麼是 Observable

個人認為在文檔中說的 Observable 更確切的說法是 Observable Stream,也就是 Rx 的響應式數據流。

在 RxJS 中 Observable 是可被觀察者,觀察者則是 Observer,它們通過 Observable 的 subscribe 方法進行關聯。

前面提到了 RxJS 結合了觀察者模式和迭代器模式。

對於觀察者模式,我們其實比較熟悉了,比如各種 DOM 事件的監聽,也是觀察者模式的一種實踐。核心就是發佈者發佈事件,觀察者選擇時機去訂閱(subscribe)事件。

在 ES6 中,Array、String 等可遍歷的數據結構原生部署了迭代器(Iterator )介面。

const numbers = [1, 2, 3]
const iterator = numbers[Symbol.iterator]()
iterator.next() // {value: 1, done: false}
iterator.next() // {value: 2, done: false}
iterator.next() // {value: 3, done: false}
iterator.next() // {value: undefined, done: true}

觀察者模式和迭代器模式的相同之處是兩者都是漸進式使用數據的,只不過從數據使用者的角度來說,觀察者模式數據是推送(push)過來的,而迭代器模式是自己去拉取(pull)的。Rx 中的數據是 Observable 推送的,觀察者不需要主動去拉取。

Observable 與 Array 相當類似,都可以看作是 Collection,只不過 Observable 是 a collection of items over time,是隨時間發出的一序列元素,所以下麵我們會看到 Observable 的一些操作符與 Array 的方法極其相似。

創建 Observable

要創建一個 Observable,只要給 new Observable 傳遞一個接收 observer 參數的回調函數,在這個函數中去定義如何發送數據。

import { Observable } from 'rxjs';

const source$ = new Observable(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
})

const observer = {
  next : item => console.log(item)
}

console.log('start')
source$.subscribe(observer)
console.log('end')

上面的代碼通過 new Observable 創建了一個 Observable,調用它的 subscribe 方法進行訂閱,執行結果為依次輸出 'start',1,2,3,'end'。

下麵我們再看一個非同步的例子:

import { Observable } from 'rxjs';
    
const source$ = new Observable(observer => {
  let number = 1
  setInterval(() => {
    observer.next(number++)
  }, 1000)
})

const observer = {
  next : item => console.log(item)
}

console.log('start')
source$.subscribe(observer)
console.log('end')

先輸出 ’start' 、'end',然後每隔 1000 ms 輸出一個遞增的數字。

通過這兩個小例子,我們知道 RxJS 既能處理同步的行為,也能處理非同步的。

觀察者 Observer

觀察者 Observer 是一個有三個方法的對象:

  • next: 當 Observable 發出新的值時被調用,接收這個值作為參數
  • complete:當 Observable 完結,沒有更多數據時被調用。complete 之後,next 方法無效
  • error:當 Observable 內部發生錯誤時被調用,之後不會調用 complete,next 方法無效

    const source$ = new Observable(observer => {
      observer.next(1)
      observer.next(2)
      observer.complete()
      observer.next(3)
    })
    
    const observer = {
      next: item => console.log(item),
      complete: () => console.log('complete')
    }
    
    source$.subscribe(observer)

上面的代碼會輸出 1,2,'complete',而不會輸出 3。

const source$ = new Observable(observer => {
  try {
    observer.next(1)
    observer.next(2)
    throw new Error('there is an exception')
    observer.complete()
  } catch (e) {
    observer.error(e)
  }
})

const observer = {
  next: item => console.log(item),
  error: e => console.log(e),
  complete: () => console.log('complete')
}

source$.subscribe(observer)

註意 error 之後不會再調用 complete。

Observer 還有簡單形式,即不用構建一個對象,而是直接把函數作為 subscribe 方法的參數。

source$.subscribe(
  item => console.log(item),
  e => console.log(e),
  () => console.log('complete')
)

參數依次為 next 、error、complete,後面兩個參數可以省略。

延遲執行(lazy evaluation)

我們傳給 new Observable 的回調函數如果沒有訂閱是不會執行的,訂閱一個 Observable 就像是執行一個函數,和下麵的函數類似。這和我們常見的那種內部保存有觀察者列表的觀察者模式是不同的,Observable 內部沒有這個觀察者列表。

function subscribe (observer) {
  let number = 1
  setInterval(() => {
    observer.next(number++)
  }, 1000)
}

subscribe({
    next: item => console.log(item),
    error: e => console.log(e),
    complete: () => console.log('complete')
})

退訂(unsubscribe)

觀察者想退訂,只要調用訂閱返回的對象的 unsubscribe 方法,這樣觀察者就再也不會接受到 Observable 的信息了。

const source$ = new Observable(observer => {
  let number = 1
  setInterval(() => {
    observer.next(number++)
  }, 1000)
})

const observer = {
  next : item => console.log(item)
}

const subscription = source$.subscribe(observer)

setTimeout(() => {
  subscription.unsubscribe()
}, 5000)

操作符

在 RxJS 中,操作符是用來處理數據流的。我們往往需要對數據流做一系列處理,才交給 Observer,這時一個操作符就像一個管道一樣,數據進入管道,完成處理,流出管道。

import { interval } from 'rxjs';
import { map } from 'rxjs/operators'
    
const source$ = interval(1000).pipe(
  map(x => x * x)
)

source$.subscribe(x => console.log(x))

interval 操作符創造了一個數據流,interval(1000) 會產生一個每隔 1000 ms 就發出一個從 0 開始遞增的數據。map 操作符和數組的 map 方法類似,可以對數據流進行處理。具體見演示地址

這個 map 和數組的 map 方法會產生新的數組類似,它會產生新的 Observable。每一個操作符都會產生一個新的 Observable,不會對上游的 Observable 做任何修改,這完全符合函數式編程“數據不可變”的要求。

上面的 pipe 方法就是數據管道,會對數據流進行處理,上面的例子只有一個 map 操作符進行處理,可以添加更多的操作符作為參數。

彈珠圖

彈珠圖(Marble diagrams)就是用圖例形象地表示 Observable 和各種操作符的一種方法。

用 - 表示一小段時間,X 代表有錯誤發生, | 表示結束,() 表示同步發生。

上面的例子可以如下表示:

source: -----0-----1-----2-----3--...
        map(x => x * x)
newest: -----0-----1-----4-----9--...

具體關於彈珠圖的使用可以查看這個網站http://rxmarbles.com/

創建 Observable

創建 Observable 的這些方法就是用來創建 Observable 數據流的,註意和操作符不同,它們是從 rxjs 中導入的,而不是 rxjs/operators

of 方法

之前我們寫的這種形式:

const source$ = new Observable(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  observer.complete()
})

使用 of 方法將會非常簡潔:

import {of} from 'rxjs'
const source$ = of(1, 2, 3)

from 方法

上面的代碼用 from 則是這樣:

import {from} from 'rxjs'
const source$ = from([1, 2, 3])

from 可以將可遍歷的對象(iterable)轉化為一個 Observable,字元串也部署有 iterator 介面,所以也支持。

from 還可以根據 promise 創建一個 Observable。我們用 fetch 或者 axios 等類庫發送的請求都是一個 promise 對象,我們可以使用 from 將其處理為一個 Observable 對象。

fromEvent 方法

用 DOM 事件創建 Observable,第一個參數為 DOM 對象,第二個參數為事件名稱。具體示例見前面 RxJS 入門章節的一個簡單例子。

fromEventPattern 方法

將添加事件處理器、刪除事件處理器的 API 轉化為 Observable。

function addClickHandler (handler) {
  document.addEventListener('click', handler)
}
 
function removeClickHandler (handler) {
  document.removeEventListener('click', handler)
}
 
fromEventPattern(
  addClickHandler,
  removeClickHandler
).subscribe(x => console.log(x))

也可以是我們自己實現的和事件類似,擁有註冊監聽和移除監聽的 API。

import { fromEventPattern } from 'rxjs'

class EventEmitter {
  constructor () {
    this.handlers = {}
  }
  on (eventName, handler) {
    if (!this.handlers[eventName]) {
      this.handlers[eventName] = []
    }
    if(typeof handler === 'function') {
        this.handlers[eventName].push(handler)
    } else {
        throw new Error('handler 不是函數!!!')
    }
  }
  off (eventName, handler) {
    this.handlers[eventName].splice(this.handlers[eventName].indexOf(handler), 1)
  }
  emit (eventName, ...args) {
    this.handlers[eventName].forEach(handler => {
      handler(...args)
    })
  }
}

const event = new EventEmitter()

const subscription = fromEventPattern(
  event.on.bind(event, 'say'), 
  event.off.bind(event, 'say')
).subscribe(x => console.log(x))

let timer = (() => {
  let number = 1
  return setInterval(() => {
    if (number === 5) {
      clearInterval(timer)
      timer = null
    }
    event.emit('say', number++)
  }, 1000)
})()

setTimeout(() => {
  subscription.unsubscribe()
}, 3000)

演示地址

interval、timer

interval 和 JS 中的 setInterval 類似,參數為間隔時間,下麵的代碼每隔 1000 ms 會發出一個遞增的整數。

interval(1000).subscribe(console.log)
// 0
// 1
// 2
// ...

timer 則可以接收兩個參數,第一個參數為發出第一個值需要等待的時間,第二個參數為之後的間隔時間。第一個參數可以是數字,也可以是一個 Date 對象,第二個參數可省。

range

操作符 of 產生較少的數據時可以直接寫如 of(1, 2, 3),但是如果是 100 個呢?這時我們可以使用 range 操作符。

range(1, 100) // 產生 1 到 100 的正整數

empty、throwError、never

empty 是創建一個立即完結的 Observable,throwError 是創建一個拋出錯誤的 Observable,never 則是創建一個什麼也不做的 Observable(不完結、不吐出數據、不拋出錯誤)。這三個操作符單獨用時沒有什麼意義,主要用來與其他操作符進行組合。目前官方不推薦使用 empty 和 never 方法,而是推薦使用常量 EMPTY 和 NEVER(註意不是方法,已經是一個 Observable 對象了)。

defer

defer 創建的 Observable 只有在訂閱時才會去創建我們真正想要操作的 Observable。defer 延遲了創建 Observable,而又有一個 Observable 方便我們去訂閱,這樣也就推遲了占用資源。

defer(() => ajax(ajaxUrl))

只有訂閱了才會去發送 ajax 請求。

操作符

操作符其實看作是處理數據流的管道,每個操作符實現了針對某個小的具體應用問題的功能,RxJS 編程最大的難點其實就是如何去組合這些操作符從而解決我們的問題。

在 RxJS 中,有各種各樣的操作符,有轉化類、過濾類、合併類、多播類、錯誤處理類、輔助工具類等等。一般不需要自己去實現操作符,但是我們需要知道操作符是一個函數,實現的時候必須考慮以下功能:

  1. 返回一個全新的 Observable 對象
  2. 對上游和下游的訂閱和退訂處理
  3. 處理異常情況
  4. 及時釋放資源

pipeable 操作符

之前版本的 RxJS 各種操作符都掛載到了全局 Observable 對象上,可以這樣鏈式調用:

source$.filter(x => x % 2 === 0).map(x => x * 2)

現在需要這樣使用:

import {filter, map} from 'rxjs/operators'

source$.pipe(
  filter(x => x % 2 === 0),
  map(x => x * 2)
)

其實也很好理解,pipe 就是管道的意思,數據流通過操作符處理,流出然後交給下一個操作符。

幾個類似數組方法的基礎操作符

map、filter 和數組的 map、filter 方法類似,scan 則是和 reduce 方法類似,mapTo 是將所有發出的數據映射到一個給定的值。

import {mapTo} from 'rxjs/operators'

fromEvent(document, 'click').pipe(
  mapTo('Hi')
).subscribe(x => console.log(x))

每次點擊頁面時都會輸出 Hi。

一些過濾的操作符

  • take 是從數據流中選取最先發出的若幹數據
  • takeLast 是從數據流中選取最後發出的若幹數據
  • takeUntil 是從數據流中選取直到發生某種情況前發出的若幹數據
  • first 是獲得滿足判斷條件的第一個數據
  • last 是獲得滿足判斷條件的最後一個數據
  • skip 是從數據流中忽略最先發出的若幹數據
  • skipLast 是從數據流中忽略最後發出的若幹數據

    import { interval } from 'rxjs';
    import { take } from 'rxjs/operators';
    
    interval(1000).pipe(
      take(3)
    ).subscribe(
      x => console.log(x),
      null,
      () => console.log('complete')
    )
    // 0
    // 1
    // 2
    // 'complete'

使用了 take(3),表示只取 3 個數據,Observable 就進入完結狀態。

import { interval, fromEvent } from 'rxjs'
import { takeUntil } from 'rxjs/operators'

interval(1000).pipe(
  takeUntil(fromEvent(document.querySelector('#btn'), 'click'))
).subscribe(
  x => { document.querySelector('#time').textContent = x + 1 },
  null,
  () => console.log('complete')
)

這裡有一個 interval 創建的數據流一直在發出數據,直到當用戶點擊按鈕時停止計時,見演示

合併類操作符

合併類操作符用來將多個數據流合併。

1)concat、merge

concat、merge 都是用來把多個 Observable 合併成一個,但是 concat 要等上一個 Observable 對象 complete 之後才會去訂閱第二個 Observable 對象獲取數據並把數據傳給下游,而 merge 時同時處理多個 Observable。使用方式如下:

import { interval } from 'rxjs'
import { merge, take } from 'rxjs/operators'

interval(500).pipe(
  take(3),
  merge(interval(300).pipe(take(6)))
).subscribe(x => console.log(x))

可以點此去比對效果,concat 的結果應該比較好理解,merge 藉助彈珠圖也比較好理解,它是在時間上對數據進行了合併。

source : ----0----1----2|
source2: --0--1--2--3--4--5|
            merge()
example: --0-01--21-3--(24)--5|

merge 的邏輯類似 OR,經常用來多個按鈕有部分相同行為時的處理。

註意最新的官方文檔RxJS v5.x 到 6 的更新指南中指出不推薦使用 merge、concat、combineLatest、race、zip 這些操作符方法,而是推薦使用對應的靜態方法。

將上面的 merge 改成從 rxjs 中導入,使用方式變成了合併多個 Observable,而不是一個 Observable 與其他 Observable 合併。

import { interval,merge } from 'rxjs'
import { take } from 'rxjs/operators'

merge(
  interval(500).pipe(take(3)),
  interval(300).pipe(take(6))
).subscribe(x => console.log(x))

2)concatAll、mergeAll、switchAll

用來將高階的 Observable 對象壓平成一階的 Observable,和 loadash 中壓平數組的 flatten 方法類似。concatAll 會對內部的 Observable 對象做 concat 操作,和 concat 操作符類似,如果前一個內部 Observable 沒有完結,那麼 concatAll 不會訂閱下一個內部 Observable,mergeAll 則是同時處理。switchAll 比較特殊一些,它總是切換到最新的內部 Observable 對象獲取數據。上游高階 Observable 產生一個新的內部 Observable 時,switchAll 就會立即訂閱最新的內部 Observable,退訂之前的,這也就是 ‘switch’ 的含義。

import { interval } from 'rxjs';
import { map, switchAll, take } from 'rxjs/operators';

interval(1500).pipe(
  take(2),
  map(x => interval(1000).pipe(
    map(y => x + ':' + y), 
    take(2))
  ),
  switchAll()
).subscribe(console.log)

// 0:0
// 1:0
// 1:1

內部第一個 Observable 對象的第二個數據還沒來得及發出,第二個 Observable 對象就產生了。

3)concatMap、mergeMap、switchMap

從上面的例子我們也可以看到高階 Observable 常常是由 map 操作符將每個數據映射為 Observable 產生的,而我們訂閱的時候需要將其壓平為一階 Observable,而就是要先使用 map 操作符再使用 concatAll 或 mergeAll 或 switchAll 這些操作符中的一個。RxJS 中提供了對應的更簡潔的 API。使用的效果可以用下麵的公式表示:

concatMap = map + concatAll
mergeMap = map + mergeAll
switchMap = map + switchAll

4)zip、combineLatest、withLatestFrom

zip 有拉鏈的意思,這個操作符和拉鏈的相似之處在於數據一定是一一對應的。

import { interval } from 'rxjs';
import { zip, take } from 'rxjs/operators';
const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

source$.pipe(
  zip(newest$, (x, y) => x + y)
).subscribe(x => console.log(x))
// 0
// 2
// 4

zip 是內部的 Observable 都發出相同順序的數據後才交給下游處理,最後一個參數是可選的 resultSelector 參數,這個函數用來處理操作符的結果。上面的示例運行過程如下:

  1. newest 發出第一個值 0,但這時 source 還沒有發出第一個值,所以不執行 resultSelector 函數也不會像下游發出數據
  2. source 發出第一個值 0,此時 newest 之前已發出了第一個值 0,執行 resultSelector 函數得到結果 0,發出這個結果
  3. newest 發出第二個值 1,但這時 source 還沒有發出第二個值,所以不執行 resultSelector 函數也不會像下游發出數據
  4. newest 發出第三個值 2,但這時 source 還沒有發出第三個值,所以不執行 resultSelector 函數也不會像下游發出數據
  5. source 發出第二個值 1,此時 newest 之前已發出了第一個值 1,執行 resultSelector 函數得到結果 2,發出這個結果
  6. newest 發出第四個值 3,但這時 source 還沒有發出第四個值,所以不執行 resultSelector 函數也不會像下游發出數據
  7. source 發出第三個值 2,此時 newest 之前已發出了第一個值 2,執行 resultSelector 函數得到結果 4,發出這個結果
  8. source 完結,不可能再有對應的數據了,整個 Observable 完結

上面如果沒有傳遞最後一個參數 resultSelector 函數,將會依次輸出數組 [0, 0]、[1, 1]、[2, 2]。在更新指南中,官方指出不推薦使用 resultSelector 參數,將會在 v7 中移除。加上之前提到的推薦使用靜態方法,這個示例應該改成這樣:

import { interval, zip } from 'rxjs';
import { take, map } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

const add = (x, y) => x + y

zip(source$, newest$).pipe(
  map(x => add(...x))
).subscribe(x => console.log(x))

使用 zip 當有數據流吐出數據很快,而有數據流發出值很慢時,要小心數據積壓的問題。這時快的數據流已經發出了很多數據,由於對應的數據還沒發出,RxJS 只能保存數據,快的數據流不斷地發出數據,積壓的數據越來越多,消耗的記憶體也會越來越大。

combineLatest 與 zip 不同,只要其他的 Observable 已經發出過值就行,顧名思義,就是與其他 Observable 最近發出的值結合。

import { interval, combineLatest } from 'rxjs';
import { take } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

combineLatest(source$, newest$).subscribe(x => console.log(x))
// [0, 0]
// [0, 1]
// [0, 2]
// [1, 2]
// [1, 3]
// [2, 3]
// [2, 4]
// [2, 5]

withLatestFrom 沒有靜態方法,只有操作符方法,前面的方法所有 Observable 地位是平等的,而這個方法是使用這個操作符的 Observable 起到了主導作用,即只有它發出值才會進行合併產生數據發出給下游。

import { interval } from 'rxjs';
import { take, withLatestFrom } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

source$.pipe(
  withLatestFrom(newest$)
).subscribe(x => console.log(x))
// [0, 0]
// [1, 2]
// [2, 4]
  1. source 發出 0 時,newest 最新發出的值為 0,結合為 [0, 0] 發出
  2. source 發出 1,此時 newest 最新發出的值為 2,結合為 [1, 2] 發出
  3. source 發出 2,此時 newest 最新發出的值為 4,結合為 [2, 4] 發出
  4. source 完結,整個 Observable 完結

5)startWith、forkJoin、race

startWith 是在 Observable 的一開始加入初始數據,同步立即發送,常用來提供初始狀態。

import { fromEvent, from } from 'rxjs';
import { startWith, switchMap } from 'rxjs/operators';

const source$ = fromEvent(document.querySelector('#btn'), 'click')

let number = 0
const fakeRequest = x => {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(number++)
    }, 1000)
  })
}

source$.pipe(
  startWith('initData'),
  switchMap(x => from(fakeRequest(x)))
).subscribe(x => document.querySelector('#number').textContent = x)

這裡通過 startWith 操作符獲取了頁面的初始數據,之後通過點擊按鈕獲取更新數據。

forkJoin 只有靜態方法形式,類似 Promise.all ,它會等內部所有 Observable 都完結之後,將所有 Observable 對象最後發出來的最後一個數據合併成 Observable。

race 操作符產生的 Observable 會完全鏡像最先吐出數據的 Observable。

const obs1 = interval(1000).pipe(mapTo('fast one'));
const obs2 = interval(3000).pipe(mapTo('medium one'));
const obs3 = interval(5000).pipe(mapTo('slow one'));

race(obs3, obs1, obs2)
.subscribe(
  winner => console.log(winner)
);

// result:
// a series of 'fast one'

一個小的練習

本文中的例子基本來自30 天精通 RxJS,使用 RxJS v6 版本進行重寫。

頁面上有一個 p 標簽存放一個狀態,初始為 0,有兩個按鈕,一個按鈕點擊後這個狀態增加 1,另一個按鈕點擊後這個狀態減少 1。

<button id="addButton">Add</button>
<button id="minusButton">Minus</button>
<p id="state"></p>

這兩個按鈕的點擊事件我們都可以建立響應式數據流,可以使用 mapTo(1) 和 mapTo(-1) 分別表示點擊後增加 1 和減少 1。我們可以使用 EMPTY 創建一個空的數據流來表示這個狀態,用 startWith 設定初始值。然後 merge 這兩個點擊的數據流,但是這還有一個問題,點擊事件的數據流需要與表示狀態的數據流進行邏輯計算,發出最終的狀態,我們才能去訂閱這個最終的數據流來更改頁面的顯示。而這種累計計算的方法,可以用 scan 操作符來實現。最終實現如下:

import { fromEvent, EMPTY, merge } from 'rxjs'
import { mapTo, startWith, scan } from 'rxjs/operators'

const addButton = document.getElementById('addButton')
const minusButton = document.getElementById('minusButton')
const state = document.getElementById('state')

const addClick$ = fromEvent(addButton, 'click').pipe(mapTo(1))
const minusClick$ = fromEvent(minusButton, 'click').pipe(mapTo(-1))

merge(
  EMPTY.pipe(startWith(0)),
  addClick$, 
  minusClick$)
.pipe(
  scan((origin, next) => origin + next)
).subscribe(item => {
  state.textContent = item
})

查看演示

簡單拖拽

頁面上有一個 id 為 drag 的 div:

<div id="drag"></div>

頁面 css:

html, body {
  height: 100%;
  background-color: tomato;
  position: relative;
}

#drag {
  position: absolute;
  width: 100px;
  height: 100px;
  background-color: #fff;
  cursor: all-scroll;
}

要實現的功能如下:

  1. 當在這個 div 上按下滑鼠左鍵(mousedown)時,開始監聽滑鼠移動(mousemove)位置
  2. 當滑鼠鬆開(mouseup)時,結束監聽滑鼠移動
  3. 當滑鼠移動被監聽時,更新 div 樣式來實現拖拽效果

實現思路:

  1. 我們可以使用 fromEvent 去轉化 DOM 事件

    const mouseDown$ = fromEvent(eleDrag, 'mousedown')
    const mouseMove$ = fromEvent(eleBody, 'mousemove')
    const mouseUp$ = fromEvent(eleBody, 'mouseup')
  2. 對於滑鼠按下這個數據流,每次滑鼠按下事件發生時都轉成滑鼠移動的數據流

    mouseDown$.pipe(
      map(mouseDownEvent => mouseMove$)
    )
  3. 滑鼠鬆開時,結束監聽滑鼠移動,我們可以用 takeUntil 表示這個邏輯

    mouseDown$.pipe(
      map(mouseDownEvent => mouseMove$.pipe(
        takeUntil(mouseUp$)
      ))
    )
  4. 上面的 map 操作符內將每次 mousedown 映射為一個 Observable,形成了高階 Observable,我們需要用 concatlAll 壓平,map 和 concatAll 連用,可以用更簡潔的 concatMap

    mouseDown$.pipe(
      concatMap(mouseDownEvent => mouseMove$.pipe(
        takeUntil(mouseUp$)
      ))
    )
  5. 訂閱這個 mousemove 數據流更新 div 位置。我們可以獲取 mousemove event 中的 clientX 和 clientY,減去初始滑鼠按下時滑鼠相對 div 元素的值來得到最終 div 的絕對位置的 left 和 top。也可以使用 withLatestFrom 操作符,見 demo

    mouseDown$.pipe(
      concatMap(mouseDownEvent => mouseMove$.pipe(
        map(mouseMoveEvent => ({
          left: mouseMoveEvent.clientX - mouseDownEvent.offsetX,
          top: mouseMoveEvent.clientY - mouseDownEvent.offsetY
        })),
        takeUntil(mouseUp$)
      ))
    ).subscribe(position => {
      eleDrag.style.left = position.left + 'px'
      eleDrag.style.top = position.top + 'px'
    })

這裡是一個更複雜一些的例子,當頁面滑動到視頻出頁面時視頻 fixed 定位,這是可以拖拽移動視頻位置。通過 getValidValue 對視頻拖拽的位置進行了一個限制。

緩存

把上游的多個數據緩存起來,當時機合適時再把匯聚的數據傳給下游。

1)buffer、bufferTime、bufferCount、bufferWhen、bufferToggle

對於 buffer 這一組操作符,數據匯聚的形式就是數組。

buffer 接收一個 Observable 作為 notifier,當 notifier 發出數據時,將 緩存的數據傳給下游。

interval(300).pipe(
  take(30),
  buffer(interval(1000))
).subscribe(
  x => console.log(x)
)
// [0, 1, 2]
// [3, 4, 5]
// [6, 7, 8]
// [9, 10, 11, 12]

bufferTime 是用時間來控制時機,上面可以改成 bufferTime(1000)

bufferCount 是用數量來控制時機,如 3 個一組,bufferCount(3)

bufferWhen 接收一個叫做 closeSelector 的參數,它應該返回一個 Observable。通過這個 Observable 來控制緩存。這個函數沒有參數。下麵的方法等價於前面的 buffer:

interval(300).pipe(
  take(30),
  bufferWhen(() => {
    return interval(1000)
  })
).subscribe(
  x => console.log(x)
)

bufferToggle 和 buffer 的不同是可以不斷地控制緩存視窗的開和關,一個參數是一個 Observable,稱為 opening,第二個參數是稱為 closeSelector 的一個函數。這個函數的參數是 opening 產生的數據。前一個參數用來控制緩存的開始時間,後一個控制緩存的結束。與 bufferWhen 相比,它的 closeSelector 可以接收參數,控制性更強。

我們可以使用 buffer 來做事件的過濾,下麵的代碼只有 500ms 內連續點擊兩次以上才會輸出 ‘success’ 。

fromEvent(document.querySelector('#btn'), 'click').pipe(
  bufferTime(500),
  filter(arr => arr.length >= 2)
).subscribe(
  x => console.log('success')
)

2)window、windowTime、windowCount、windowWhen、windowToggle

與前面的 buffer 類似,不過 window 緩存數據匯聚的形式是 Observable,因此形成了高階 Observable。

debounceTime、throttleTime

類似 lodash 的 debounce 和 throttle,用來降低事件的觸發頻率。

我們做搜索時,常常要對輸入進行 debounce 來減少請求頻率。

fromEvent(document.querySelector('#searchInput'), 'input').pipe(
  debounceTime(300),
  map(e => e.target.value)
).subscribe(
  input => document.querySelector('#text').textContent = input
  // 發送請求
)

distinct、distinctUntilChanged

distinct 操作符可以用來去重,將上游重覆的數據過濾掉。

of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1).pipe(
  zip(interval(1000)),
  map(arr => arr[0]),
  distinct()
).subscribe(x => console.log(x))

上面的代碼只會輸出 1, 2, 3, 4

distinct 操作符還可以接收一個 keySelector 的函數作為參數,這是官網的一個 typescript 的例子:

interface Person {
  age: number,
  name: string
}

of<Person>(
  { age: 4, name: 'Foo' },
  { age: 7, name: 'Bar' },
  { age: 5, name: 'Foo' },
).pipe(
  distinct((p: Person) => p.name),
).subscribe(x => console.log(x))
 
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }

distinctUntilChanged 也是過濾重覆數據,但是只會與上一次發出的元素比較。這個操作符比 distinct 更常用。distinct 要與之前發出的不重覆的值進行比較,因此要在內部存儲這些值,要小心記憶體泄漏,而 distinctUntilChanged 只用保存上一個的值。

dalay、delayWhen

用來延遲上游 Observable 數據的發出。

delay 可以接受一個數字(單位預設為 ms)或者 date 對象作為延遲控制。

const clicks = fromEvent(document, 'click')
const delayedClicks = clicks.pipe(delay(1000)) // 所有點擊事件延遲 1 秒
delayedClicks.subscribe(x => console.log(x))

我們前面介紹過 bufferWhen,dalayWhen 也帶有 when,在 RxJS 中,這種操作符它接收的參數都是 Observable Factory,即一個返回 Observable 對象的回調函數,用這個 Observable 來進行控制。

每個 click 都延遲 0 至 5 秒之間的任意一個時間:

const clicks = fromEvent(document, 'click')
const delayedClicks = clicks.pipe(
  delayWhen(event => interval(Math.random() * 5000)),
)
delayedClicks.subscribe(x => console.log(x))

異常錯誤處理

異常處理的難點:

  1. try/catch 只支持同步
  2. 回調函數容易形成回調地獄,而且每個回調函數的最開始都要判斷是否存在錯誤
  3. Promise 不能重試,而且不強制異常被捕獲

對錯誤處理的處理可以分為兩類,即恢復(recover)和重試(retry)。

恢復是雖然發生了錯誤但是讓程式繼續運行下去。重試,是認為這個錯誤是臨時的,重試嘗試發生錯誤的操作。實際中往往配合使用,因為一般重試是由次數限制的,當嘗試超過這個限制時,我們應該使用恢復的方法讓程式繼續下去。

1)catchError

catchError 用來在管道中捕獲上游傳遞過來的錯誤。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  catchError(err => of(8))
).subscribe(x => console.log(x))
// 0
// 1
// 2
// 3
// 8

catchError 中的回調函數返回了一個 Observable,當捕獲到上游的錯誤時,調用這個函數,返回的 Observable 中發出的數據會傳遞給下游。因此上面當 x 為4 時發生了錯誤,會用 8 來替換。

catchError 中的回調函數除了接收錯誤對象為參數外,還有第二個參數 caught$ 表示上游的 Observable 對象。如果回調函數返回這個 Observable 對象,就會進行重試。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  catchError((err, caught$) => caught$),
  take(20)
).subscribe(x => console.log(x))

這個代碼會依次輸出 5 次 0, 1, 2, 3。

2)retry

retry 可以接收一個整數作為參數,表示重試次數,如果是負數或者沒有傳參,會無限次重試。重試實際上就是退訂再重新訂閱。

interval(1000).pipe(
      take(6),
      map(x => {
        if (x === 4) {
          throw new Error('unlucky number 4')
        } else {
          return x
        }
      }),
      retry(5) // 重試 5 次
    ).subscribe(x => console.log(x))

在實際開發中,如果是代碼原因造成的錯誤,重試沒有意義,如果是因為外部資源導致的異常錯誤適合重試,如用戶網路或者伺服器偶爾不穩定的時候。

3)retryWhen

和前面帶 when 的操作符一樣,retryWhen 操作符接收一個返回 Observable 的回調函數,用這個 Observable 來控制重試的節奏。當這個 Observable 發出一個數據時就會進行一次重試,它完結時 retryWhen 返回的 Observable 也立即完結。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  retryWhen(err$ => err$.pipe(
    delay(1000),
    take(5))
  ) // 延遲 1 秒後重試,重試 5 次
).subscribe(x => console.log(x))

retryWhen 的可定製性非常高,不僅可以實現延遲定製,還可以實現 retry 的控制重試次數。在實踐中,這種重試頻率固定的方法還不夠好,如果之前的重試失敗,之後重試成功的幾率也不高。Angular 官網介紹了一個 Exponential backoff 的方法。將每次重試的延遲時間控製為指數級增長。

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
 
function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));
 
function handleData(data) {
  // ...
}

4)finalize

返回上游數據流的鏡像 Observable,當上游的 Observable 完結或出錯時調用傳給它的函數,不影響數據流。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  finalize(() => console.log('finally'))
).subscribe(x => console.log('a'))

tap 操作符

我們可以使用 tap 操作符來進行調試。

攔截源 Observable 的每一次發送,執行一個函數,返回源 Observable 的鏡像 Observable。

這個 API 有助於我們對 Observable 的值進行驗證(debug)和執行一個會帶來副作用的函數,而不會影響源 Observable。如我們用滑鼠進行 canvas 繪圖,滑鼠按下是開始畫圖,滑鼠鬆開即停止。我們需要在 mousedown 的時候進行 moveTo,否則這次畫的會和上次畫的連在一起。我們應該把這個會帶來副作用過程放在 tap 操作符的函數中,這樣才不會影響原來的數據流。

tap 操作符和訂閱並不相同,tap 返回的 Observable 如果沒有被訂閱,tap 中產生副作用的函數並不會執行。

其他一些操作符

1) repeat

repeat 用來重覆上游 Observable

2)pluck 類似 lodash 的方法 pluck,提取對象的嵌套屬性的值。

const click$ = fromEvent(document, 'click')
const tagName$ = click$.pipe(pluck('target', 'tagName'))
tagName$.subscribe(x => console.log(x))

等價於:

click$.pipe(map(e => e.target.tagName))

3)toArray

將發出的數據匯聚為數組

interval(1000).pipe(
  take(3),
  toArray()
).subscribe(x => console.log(x))
// [0, 1, 2]

4)partition

將上游的 Observable 分為兩個,一個 Observable 的數據是符合判定的數據,另一個時不符合判定的數據。

const part$ = interval(1000).pipe(
  take(6),
  partition(x => x % 2 === 0)
)

part$[0].subscribe(x => console.log(x)) // 0, 2, 4
part$[1].subscribe(x => console.log(x)) // 1, 3, 5

5) 更多操作符

RxJS 中的操作符非常多,這裡只介紹了一部分,更多請查看官網 API

RxJS 最經典的例子——AutoComplete

有一個用於搜索的 input,當輸入時自動發送 ajax,併在下方顯示結果列表,然後可以選擇結果,這就是我們常見的 AutoComplete 效果。要實現這個效果有很多細節要考慮,如防止 race condition 和優化請求次數。

<div class="autocomplete">
    <input class="input" type="search" id="search" autocomplete="off">
    <ul id="suggest-list" class="suggest"></ul>
</div>

先獲取兩個 DOM 元素:

const input = document.querySelector('#search');
const suggestList = document.querySelector('#suggest-list');

我們先將輸入框的 input 的事件轉化為 Observable。

const input$ = fromEvent(input, 'input');

然後我們根據輸入的值去發送 ajax 請求,由於我們是要獲取最新的值而丟棄之前 ajax 返回的值,我們應該使用 switchMap 操作符。通過使用這個操作符,我們解決了 race condition 問題。

input$.pipe(
  switchMap(e => from(getSuggestList(e.target.value)))
)

getSuggestList 是一個發送 ajax 請求的方法,返回 promise,我們使用 from 來將其轉化為 Observable。

為了優化請求,首先 e.target.value 是空字元串時不應該發送請求,然後可以使用 debounceTime 減少觸發頻率,也可以使用 distinctUntilChanged 操作符來表示只有與上次不同時才去發送請求。我們還可以在 API 失敗時重試 3 次。

input$.pipe(
  filter(e => e.target.value.length > 1),
  debounceTime(300),
  distinctUntilChanged(),
    switchMap(
      e => from(getSuggestList(e.target.value)).pipe(retry(3))
    )
  )

然後我們去訂閱渲染就可以了。

對於結果列表上的點擊事件,比較簡單,具體見demo

操作符和數組方法

Observable 的操作符和數組的方法有相似之處,但是也有很大的不同,體現在以下兩點:

  1. 延遲運算
  2. 漸進式取值

延遲運算,我們之前有講到過,就是只有訂閱後才會開始對元素進行運算。

因為 Observable 是時間上的集合,操作符不是像數組方法那樣運算完所有元素再返回交給下一個方法,而是一個元素一直運算到底,就像管道中的水流一樣,先發出的數據先經過操作符的運算。

多播

前面的例子都是只有一個訂閱者的情況,實際上當然可以有多個訂閱者,這就是多播(multicast),即一個數據流的內容被多個 Observable 訂閱。

Hot Observable 和 Cold Observable

先思考一下下麵的例子結果是什麼?

const source$ = interval(1000).pipe(
  take(3)
)

source$.subscribe(x => console.log('Observer 1: ' + x))

setTimeout(() => {
  source$.subscribe(x => console.log('Observer 2: ' + x))
}, 1000)

你可能會以為 Observer 2 一秒後才訂閱,錯過了數據 0,因此只會輸出 1 和 2,但實際上會先輸出 0。為什麼如此呢?這就涉及到對已錯過數據的兩種處理策略。

  1. 錯過的就讓它過去,只要訂閱之後生產的數據就好
  2. 不能錯過,訂閱之前生產的數據也要

第一種策略類似於直播,第二種和點播相似。使用第一種策略的 Observable 叫做 Cold Observable,因為每次都要重新生產數據,是 “冷”的,需要重新發動。第二種,因為一直在生產數據,只要使用後面的數據就可以了,所以叫 Hot Observable。

RxJS 中如 interval、range 這些方法產生的 Observable 都是 Cold Observable,產生 Hot Observable 的是由 Promise、Event 這些轉化而來的 Observable,它們的數據源都在外部,和 Observer 無關。

前面我們提到 Observable 都是 lazy evaluation 的,數據管道內的邏輯只有訂閱後才會執行,但是 Cold Observable 相對更 lazy 一些。Cold Observable 如果沒有訂閱者連數據都不會產生,對於 Hot Observable,數據仍會產生,但是不會進入管道處理。

Hot Observable 是多播,對於 Cold Observable,每次訂閱都重新生產了一份數據流,所以不是多播。下麵的例子更加明顯,兩個訂閱者有很大的概率會接收到不同的數據。

const source$ = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3)
)

source$.subscribe(x => console.log('Observer 1: ' + x))

setTimeout(() => {
  source$.subscribe(x => console.log('Observer 2: ' + x))
}, 1000)

如果想要實現多播,就要使用 RxJS 中 Subject。

Subject

為了防止每次訂閱都重新生產一份數據流,我們可以使用中間人,讓這個中間人去訂閱源數據流,觀察者都去訂閱這個中間人。這個中間人能去訂閱數據流,所以是個 Observer,又能被觀察者訂閱,所以也是 Observable。我們可以自己實現一個這樣的中間人:

const subject = {
  observers: [],
  subscribe: function (observer) {
    this.observers.push(observer)
  },
  next: function (value) {
    this.observers.forEach(o => o.next(value))
  },
  error: function (error) {
    this.observers.forEach(o => o.error(error))
  },
  complete: function () {
    this.observers.forEach(o => o.complete())
  }
}

這個 subject 擁有 Observer 的 next、error、complete 方法,每次被觀察者訂閱時都會在內部保存這個觀察者。當接收到源數據流的數據時,會把數據發送給每一個觀察者。

const source$ = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3)
)

const observerA = {
  next: x => console.log('Observer A: ' + x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: ' + x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source$.subscribe(subject)
subject.subscribe(observerA)
setTimeout(() => {
  subject.subscribe(observerB)
}, 1000)

這時我們發現兩個觀察者接收到的是同一份數據,ObserverB 由於延遲一秒訂閱,所以少接收到一個數據。將我們自己實現的 subject 換成 RxJS 中的 Subject,效果相同:

import { Subject } from 'rxjs'
const subject = new Subject()

從上面可以看到,Subject 和 Observable 有一個很大的不同:它內部保存有一個觀察者列表。

前面的 subject 是在源數據流發出值時調用 next 方法,向訂閱的觀察者發送這個值,我們也可以手動調用 subject 的next 方法送出值:

const observerA = {
  next: x => console.log('Observer A: ' + x)
}
const observerB = {
  next: x => console.log('Observer B: ' + x)
}

const subject = new Subject()

subject.subscribe(observerA)
setTimeout(() => {
  subject.subscribe(observerB)
}, 500)

subject.next(1)
setTimeout(() => {
  subject.next(2)
}, 1000)

總結一下,Subject 既是 Observable 又是 Observer,它會對內部的 observers 清單進行組播(multicast)。

Subject 的錯誤處理

在 RxJS 5 中,如果 Subject 的某個下游數據流產生了錯誤異常,而又沒有被 Observer 處理,那這個 Subject 的其他 Observer 都會失敗。但是在 RxJS 6 中不會如此。

在 v6 的這個例子 中,ObserverA 沒有對錯誤進行處理,但是並不影響 ObserverB,而在 v5 這個demo中因為 ObserverA 沒有對錯誤進行處理,使得 ObserverB 終止了。很明顯 v6 的這種處理更符合直覺。

BehaviorSubject、ReplaySubject、AsyncSubject

1)BehaviorSubject

BehaviorSubject 需要在實例化時給定一個初始值,如果沒有預設是 undefined,每次訂閱時都會發出最新的狀態,即使已經錯過數據的發送時間。

const observerA = {
  next: x => console.log('Observer A: ' + x)
}
const observerB = {
  next: x => console.log('Observer B: ' + x)
}

const subject = new BehaviorSubject(0)

subject.subscribe(observerA) // Observer A: 0

subject.next(1) // Observer A: 1
subject.next(2) // Observer A: 2
subject.next(3) // Observer A: 3

setTimeout(() => {
  subject.subscribe(observerB) // Observer B: 3
}, 500)

observerB 已經錯過流數據的發送時間,但是訂閱時也能獲取到最新數據 3。

BehaviorSubject 有點類似於狀態,一開始可以提供初始狀態,之後訂閱都可以獲取最新的狀態。

2)ReplaySubject

ReplaySubject 表示重放,在新的觀察者訂閱時重新發送原來的數據,可以通過參數指定重放最後幾個數據。

const observerA = {
  next: x => console.log('Observer A: ' + x)
}
const observerB = {
  next: x => console.log('Observer B: ' + x)
}

const subject = new ReplaySubject(2) // 重放最後兩個

subject.subscribe(observerA)

subject.next(1) // Observer A: 1
subject.next(2) // Observer A: 2
subject.next(3) // Observer A: 3
subject.complete()

setTimeout(() => {
  subject.subscribe(observerB)
  // Observer B: 2
  // Observer B: 3
}, 500)

這裡我們可以看到,即使 subject 完結後再去訂閱依然可以重放最後兩個數據。

ReplaySubject(1) 和前面的 BehaviorSubject 是不一樣的,首先後者可以提供預設數據,而前者不行,其次前者在 subject 終結後再去訂閱依然可以得到最近發出的數據而後者不行。

3)AsyncSubject

AsyncSubject 有點類似 operator last,會在 subject 完結後送出最後一個值。

const subject = new AsyncSubject()

subject.subscribe(observerA)

subject.next(1)
subject.next(2)
subject.next(3)
subject.complete()
// Observer A: 3
setTimeout(() => {
  subject.subscribe(observerB)
  // Observer B: 3
}, 500)

observerA 即使早就訂閱了,但是並不會響應前面的 next,完結後才接收到最後一個值 3。

多播操作符

前面我們寫的 Subject 需要去訂閱源數據流和被觀察者訂閱,寫起來比較繁瑣,我們可以藉助操作符來實現。

1)multicast

使用方式如下,接收一個 subject 或者 subject factory。這個操作符返回了一個 connectable 的 Observable。等到執行 connect() 才會用真的 subject 訂閱 source,並開始發送數據,如果沒有 connect,Observable 是不會執行的。

const source = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3),
  multicast(new Subject)
)

const observerA = {
  next: x => console.log('Observer A: ' + x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: ' + x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source.subscribe(observerA) // subject.subscribe(observerA)

source.connect() // source.subscribe(subject)

setTimeout(() => {
  source.subscribe(observerB) // subject.subscribe(observerB)
}, 1000)

2)refCount

上面使用了 multicast,但是還是有些麻煩,還需要去手動 connect。這時我們可以再搭配 refCount 操作符創建只要有訂閱就會自動 connect 的 Observable。只需要去掉 connect 方法調用,在 multicast 後面再加一個 refCount 操作符。

multicast(new Subject),
refCount()

refCount 其實就是自動計數的意思,當 Observer 數量大於 1 時,subject 訂閱上游數據流,減少為 0 時退訂上游數據流。

3)multicast selector 參數

multicast 第一個參數除了是一個 subject,還可以是一個 subject factory,即返回 subject 的函數。這時使用了不同的中間人,每個觀察者訂閱時都重新生產數據,適用於退訂了上游之後再次訂閱的場景。

multicast 還可以接收可選的第二個參數,稱為 selector 參數。它可以使用上游數據流任意多次,而不會重覆訂閱上游的數據。當使用了這個參數時,multicast 不會返回 connectable Observable,而是這個參數(回調函數)返回的 Observable。selecetor 回調函數有一個參數,通常叫做 shared,即 multicast 第一個參數所代表的 subject 對象。

const selector = shared => {
  return shared.pipe(concat(of('done')))
}
const source = interval(1000).pipe(
  take(3),
  multicast(new Subject, selector)
)

const observerA = {
  next: x => console.log('Observer A: ' + x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: ' + x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source.subscribe(observerA)
setTimeout(() => {
  source.subscribe(observerB)
}, 5000)
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: done
// Observer A completed
// Observer B: done
// Observer B: completed

observerB 訂閱時會調用 selector 函數,subject 即shared 已經完結,但是 concat 依然會在這個 Observable 後面加上 'done'。

可以利用 selector 處理 “三角關係”的數據流,如有一個 tick$ 數據流,對其進行 delay(500) 操作後的下游 delayTick$, 一個由它們合併得到的 mergeTick$,這時就形成了三角關係。delayTick$ 和 mergeTick$ 都訂閱了 tick$。

const tick$ = interval(1000).pipe(
  take(1),
  tap(x => console.log('source: ' + x))
)

const delayTick$ = tick$.pipe(
  delay(500)
)

const mergeTick$ = merge(tick$, delayTick$).subscribe(x => console.log('observer: ' + x))
// source: 0
// observer: 0
// source: 0
// observer: 0

從上面的結果我們可以驗證,tick$ 被訂閱了兩次。

我們可以使用 selector 函數來使其只訂閱一次,將上面的過程移到 selector 函數內即可。

const source$ = interval(1000).pipe(
  take(1),
  tap(x => console.log('source: ' + x))
)

const result$ = source$.pipe(
  multicast(new Subject(), shared => {
    const tick$ = shared
    const delayTick$ = tick$.pipe(delay(500))
    const mergeTick$ = merge(tick$, delayTick$)
    return mergeTick$
  })
)

result$.subscribe(x => console.log('observer: ' + x

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1 建立備份數據表 CREATE TABLE [dbo].[ProcBackup]( [id] [int] IDENTITY(1,1) NOT NULL, [name] [sysname] NOT NULL, [db] [nvarchar](50) NULL, [obj_id] [int] NULL ...
  • 在這裡還是要推薦下我自己建的大數據學習交流群:119599574,群里都是學大數據開發的,如果你正在學習大數據 ,小編歡迎你加入,大家都是軟體開發黨,不定期分享乾貨(只有大數據軟體開發相關的),包括我自己整理的一份2018最新的大數據進階資料和高級開發教程,歡迎進階中和進想小伙伴加入。在這裡還是要推 ...
  • 原文出處:https://www.infoworld.com/article/3210905/sql/10-essential-performance-tips-for-mysql.html MySQL的10個基本性能技巧 與所有的關係資料庫一樣,MySQL正如一頭怪獸一般, 它可能會在接到通知的一 ...
  • MySQL連接方式MySQL除了最常見的TCP連接方式外,還提供SOCKET(LINUX預設連接方式)、PIPE和SHARED MEMORY連接方式。各連接方式的伺服器、客戶端啟動選項,及連接預設值見下表: TCP連接(Linux,Windows):目標 啟動選項 預設值SERVER --port= ...
  • MySQL的uuid這個函數。簡要介紹一下。 用法 簡單看到,這個值,每次執行都是不同的。 生成規則 第1 2 3 段是與時間有關的。 time_low、time_mid、time_high_and_version轉成16進位後分別對應第1 2 3段。這個時間是從1582-10-15 00:00:0 ...
  • 我對CSS選擇器的認識 一、簡述 CSS選擇器是對HTML元素進行選擇的篩選條件,大概可以分為兩類: 在真正使用的時候,幾個簡單的選擇器可以組合成更複雜的選擇器,所以誰也說不上CSS選擇器有多少。還有兩個選擇器是功能性的,它們可以給元素已有內容之前或之後添加新內容。 我做了一個項目,裡面包含一個測試 ...
  • TCP三次握手 客戶端與伺服器之間互相發送HTTP請求響應之前需要先進行TCP連接,因為HTTP是一個無連接、無狀態協議,不存在連接的概念,只有請求和響應的概念。而請求和響應實際上只是數據包,他們需要傳輸通道進行傳輸,而這個傳輸通道就是TCP創建的通道。那麼這個通道是如何創建的呢?就是通過TCP三次 ...
  • angularjs學習第二天筆記---過濾器。主要學習了過濾器的使用方式,以及內置過濾器之:貨幣過濾器(currency)、時間過濾器(date) ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...