RxJs 中創建操作符是創建數據流的起點,這些操作符可以憑空創建一個流或者是根據其它數據形式創建一個流。 Observable的構造函數可以直接創建一個數據流,比如: const $source=new Observable(observer=>{ observer.next(1); observe ...
RxJs 中創建操作符是創建數據流的起點,這些操作符可以憑空創建一個流或者是根據其它數據形式創建一個流。 Observable的構造函數可以直接創建一個數據流,比如:
const $source=new Observable(observer=>{ observer.next(1); observer.next(2); observer.next(3); })
但是在真正的使用過程中很少使用這種方式去創建,RxJx 提供了大量的創建操作符供我們在開發中去使用。創建型操作符打不風都是靜態操作符。
一、創建同步數據流
同步數據流,或者說同步Observable對象,需要關⼼的就是: 1.產⽣哪些數據 2. 數據之間的先後順序如何。 對於同步數據流,數據之間的時間間隔不存在,所以不需要考慮時間 ⽅⾯的問題。
1、create 最簡單的操作符
它的使用像這樣:
import {Observable} from 'rxjs/Observable'; const onSubscribe=observer=>{ observer.next(1); observer.next(2); observer.next(3); } const source$=Observable.create(onSubscribe); var theObserver={ next:item=>console.log(item) } // var theObserver={}; // theObserver.next=item=>console.log(item) source$.subscribe(theObserver);//output:1,2,3
2、of 列舉數據
of 可以根據一個指定的數據集創建一個Observable
(1)Observabe 的靜態方法創建(這種靜態創建是將of操作符掛載在Javascript的prototype上類似這樣Observable.prototype.of=of 這樣就成了Observable的全局函數了)
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/of'; const source$=Observable.of(1,2,3); source$.subscribe(console.log);
(2)從rxjs/observable下導入包創建 (這種of沒有直接掛載在prototype上)
import { of } from "rxjs/observable/of"; const source$=of(1,2,3); source$.subscribe(console.log);//output 1,2,3
3、range 指定範圍
range可以產生一個連續的序列,指定起始值和長度
import { Observable } from 'rxjs/Observable'; import { range } from 'rxjs/add/observable/range'; const source$ = Observable.range(1, 100); source$.subscribe(console.log);//output 1 to 100
range 的第一個參數可以是小數,大師每次只能遞增1
像這樣
import { Observable } from 'rxjs/Observable'; import { range } from 'rxjs/add/observable/range'; const source$ = Observable.range(1.5, 100); source$.subscribe(console.log);//output 1.5 to 100.5
4、generate 產生一個定製的序列
import { Observable } from "rxjs/Observable"; import { generate } from 'rxjs/add/observable/generate'; const source=Observable.generate(3, condation=>condation<10, step=>step+2, result=>result*result) .subscribe(console.log);//output 9,25,49,91
第一個參數是初始值,第二個是結束的條件,第三個是每次遞增的步長,第四個參數是對每次產生結果的一個回調處理,這個操作符和 for迴圈類似,上面的這個語句的意思是和下麵的語句一樣
for (let i = 3; i < 10; i = i + 2) {
console.log(i * i);
}
generate 不限於產生數字的序列,比如像 這樣:
Observable.generate('a', c=>c.length<5, s=>s+'a', r=>r).subscribe(console.log);//a aa aaa aaaa
5、repeat 重覆數據的數據流
這個是一個實例操作符 可以通過下麵兩種方式導入
import {repeat} from 'rxjs/add/operator/repeat'; //or import {repeat} from 'rxjs/operators/repeat';
將上游的數據流的數據重覆設置的次數:
import {repeat} from 'rxjs/add/operator/repeat'; import 'rxjs/add/Observable/of'; import { Observable } from 'rxjs/Observable'; //or //import {repeat} from 'rxjs/operators/repeat'; Observable.of(1,2,3) .repeat(3) .subscribe(console.log);//output:1,2,3 1,2,3 1,2,3
j
將上游數據流重覆輸出3次, 實質上repeat是通過重覆訂閱上游數據流來達到重覆的效果,如果上游數據流不complete repeat就沒有效果。
repeat 的參數標識重覆的次數,如果必須輸入預期的重覆次數參數應該大於0, 如果小於0會進行無限次迴圈,程式的的執行結果將不可預期。
6、empty 直接產生一個完結的數據流.
import 'rxjs/add/observable/empty'; const source$ = Observable.empty();
7、throw 直接接拋出一個錯誤
import 'rxjs/add/observable/throw'; const source$ = Observable.throw(new Error('Somthing error'));
因為throw 是javascript的關鍵字如果不用靜態的方式使用導入時贏改是 _throw,避免和javascript關鍵字衝突
import {_throw} from 'rxjs/observable/throw'; const source$ = _throw(new Error('some error'));
8、never 創建一個Obervable 對象什麼都不錯,即不吐數據也不出錯
import 'rxjs/add/observable/never'; const source$ = Observable.never();
前面的8個操作符都是同步操作符,產生的都是同步數據流。
9、interval 和timer 定時產生數據流
這倆個是最簡單的產生非同步數據流的操作符, 它們類似javascript中的setInterval 和setTimeout。
interval接受一個參數就是產生從0開始的序列的毫秒數
import { Observable } from "rxjs/Observable"; import { interval } from "rxjs/add/observable/interval"; Observable.interval(1000).subscribe(console.log);
每次只能從0開始且每次只能遞增1,如果想改起始數字可以結合map操作符來實現, 如下是從2開始每次遞增1.
Observable.interval(1000).map(x=>x+2).subscribe(console.log);
timer地第一個參數可以傳數字也也可以穿Date,傳數字表示多好毫秒輸出0, 如果傳入的是Date 表示時間到這個Date時輸出0,第二個參數表示間隔的時間,如果第一個參數和第二個參數都輸入那就和interval的功能一樣了
下麵這兩條語句輸出的結果相同:
import { Observable } from "rxjs/Observable"; import "rxjs/add/observable/interval"; import "rxjs/add/observable/timer"; Observable.interval(1000).subscribe(console.log); Observable.timer(1000,1000).subscribe(console.log);
10、from 可以將一切轉化成Observable
import { Observable } from "rxjs/Observable"; import "rxjs/add/observable/from"; import "rxjs/add/observable/of"; Observable.from([1, 2, 3]).subscribe(console.log);//output:1,2,3 Observable.from("Hello").subscribe(console.log);//output:H,e,l,l,o function toObservable() { return Observable.from(arguments);//在JavaScript中,任何⼀個函數體中都可以通過arguments訪問所有的調 ⽤參數 } toObservable(1, 2, 3, 4).subscribe(console.log);//output:1,2,3,4 function * generateNumber(max) { for (let i = 1; i <= max; ++i) { yield i; } } Observable.from(generateNumber(3)).subscribe(console.log);//output:1,2,3 const source$=Observable.of(1,2,3); Observable.from(source$).subscribe(console.log);//output:1,2,3
11.fromPromise 非同步處理的交接
這操作符在實際使用中用的非常多。
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/from'; const promise = Promise.resolve('successfuly'); const source$ = Observable.from(promise); source$.subscribe(console.log, error => console.log('catch', error), () => console.log('complete'));
12.fromEvent 將對DOM的操作轉化成Observable
第一個參數是事件源DOM 中就是具體的HTML元素,第二個參數是事件名稱 如:click,mouseover等。
HTML:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>from event</title> <script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script> </head> <body> <div> <button id="btnTest">I'm button</button> <label id="lblTest"></label> </div> <script src="./fromEventTest.js"></script> </body> </html>
Javascript:
let number = 0; Rx.Observable.fromEvent(document.querySelector('#btnTest'), 'click') .subscribe(x => { document.querySelector('#lblTest').innerText = ++number; });
點擊按鈕label中的數字遞增1.
13、fromEventPattern 更靈活的事件轉化成Observable操作符
fromEventPattern接受兩個函數參數,分別對應產⽣的Observable對象 被訂閱和退訂時的動作,因為這兩個參數是函數,具體的動作可以任意定 義,所以可以⾮常靈活。
import { Observable } from "rxjs/Observable"; import { EventEmitter } from "events"; import 'rxjs/add/observable/fromEventPattern'; const emitter = new EventEmitter(); const addHandler = handler => { emitter.addListener('msg', handler); } const removeHandler = handler => { emitter.removeListener('msg', handler); } const source$ = Observable.fromEventPattern(addHandler, removeHandler); const subscription = source$.subscribe(console.log, error => console.log("error:" + error), console.log("complete")); emitter.emit('msg', 1); emitter.emit('msg', '2'); emitter.emit('msg', 3); subscription.unsubscribe(); emitter.emit('msg', 4);
14、ajax 通過ajax 返回的結果 創建一個Observable
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>ajax</title> <script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script> </head> <body> <div> <button id="btnTest">I'm button</button> <label id="lblTest"></label> </div> <script src="./ajax.js"></script> </body> </html>
Javascript
Rx.Observable.fromEvent(document.querySelector("#btnTest"), "click") .subscribe(y => { Rx.Observable.ajax("https://api.github.com/repos/ReactiveX/rxjs") .subscribe(x => { console.log(x); document.querySelector("#lblTest").innerText = x.response; }, error => { console.log("error:" + error); }, () => console.log("complete")); })
15、repeatWhen反覆訂閱上游的數據流並且可以設置一個等待的時間,repeat 只能反覆訂閱不能設置時間
import { Observable } from "rxjs"; Observable.of(1,2,3).repeatWhen(()=>{ return Observable.interval(1000); }).subscribe(console.log);
16.defer 延遲資源調用操作符
import 'rxjs/add/observable/defer'; import 'rxjs/add/observable/of'; import {Observable} from 'rxjs/Observable'; const observableFactory = () => Observable.of(1, 2, 3); const source$ = Observable.defer(observableFactory); source$.subscribe(console.log);