一、概述 在 RxJava 中,一個實現了 介面的對象可以訂閱一個 類的實例。訂閱者對 發射的任何數據或數據序列作出響應。這種模式簡化了併發操作,因為它不需要阻塞等待 發射數據,而是創建了一個處於待命狀態的觀察者哨兵,哨兵在未來某個時刻響應 的通知。RxJava 提供了一套非同步編程的 API,並且支 ...
一、概述
在 RxJava 中,一個實現了 Observer
介面的對象可以訂閱一個 Observable
類的實例。訂閱者對 Observable
發射的任何數據或數據序列作出響應。這種模式簡化了併發操作,因為它不需要阻塞等待 Observable
發射數據,而是創建了一個處於待命狀態的觀察者哨兵,哨兵在未來某個時刻響應 Observable
的通知。RxJava 提供了一套非同步編程的 API,並且支持鏈式調用,所以使用 RxJava 編寫的代碼的邏輯會非常簡潔
RxJava 有以下三個最基本的元素:
- 被觀察者(Observable)
- 觀察者(Observer)
- 訂閱(subscribe)
創建被觀察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) {
Log.e(TAG, "subscribe");
Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
創建觀察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
};
完成觀察者與被觀察者之間的訂閱關係
observable.subscribe(observer);
也可以以鏈式調用的方式來完成訂閱
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) {
Log.e(TAG, "subscribe");
Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
最終的輸出結果是一樣的
onSubscribe
subscribe
currentThread name: main
onNext: 1
onNext: 2
onNext: 3
onComplete
被觀察者發送的事件類型有以下幾種
事件種類 | 作用 |
---|---|
onNext() | 發送該事件時,觀察者會回調 onNext() 方法 |
onError() | 發送該事件時,觀察者會回調 onError() 方法,當發送該事件之後,其他事件將不會繼續發送 |
onComplete() | 發送該事件時,觀察者會回調 onComplete() 方法,當發送該事件之後,其他事件將不會繼續發送 |
下麵來講解 RxJava 中各種常見的操作符
二、創建操作符
2.1、create()
用於創建一個 Observable
。一個正確的 Observable
必須嘗試調用觀察者的 onCompleted
方法或者 onError
方法有且僅有一次,而且此後不能再調用Observable
的任何其它方法
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) {
Log.e(TAG, "subscribe");
Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
2.2、just()
創建一個 Observable
併發送事件,發送的事件總數不可以超出十個
Observable.just(1, 2, 3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
onSubscribe
onNext: 1
onNext: 2
onNext: 3
onComplete
2.3、fromArray
和 just()
類似,但 fromArray
可以傳入多於十個的變數,並且可以傳入一個數組
Integer[] arrays = new Integer[]{1, 2, 3};
Observable.fromArray(arrays).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
2.4、fromCallable
這裡的 Callable
是指 java.util.concurrent
中的 Callable
,Callable
和 Runnable
的用法基本一致,只是它包含一個返回值,這個結果值就是發給觀察者的
Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() {
return 100;
}
});
2.5、fromFuture
這裡的 Future
是指 java.util.concurrent
中的 Future
,Future
的作用是增加了 cancel()
等方法操作 Callable
,它可以通過 get()
方法來獲取 Callable
返回的值
final FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() {
return 12;
}
});
Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) {
futureTask.run();
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.e(TAG, "accept: " + integer);
}
});
2.6、fromIterable()
用於發送一個 List
集合數據給觀察者
List<Integer> integerList = new ArrayList<>();
integerList.add(1);
integerList.add(2);
integerList.add(3);
Observable.fromIterable(integerList).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.e(TAG, "accept: " + integer);
}
});
2.7、defer()
defer
操作符會一直等待直到有觀察者訂閱它,然後它使用 Observable
工廠方法生成一個 Observable
。它對每個觀察者都這樣做,因此儘管每個訂閱者都以為自己訂閱的是同一個 Observable
,實際上每個訂閱者獲取到的都是它們自己的單獨的數據序列。在某些情況下,直到發生訂閱時才生成 Observable
可以確保 Observable
包含最新的數據
//全局變數
private Integer value = 100;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() {
return Observable.just(value);
}
});
value = 200;
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.e(TAG, "accept: " + integer);
}
});
value = 300;
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.e(TAG, "accept: " + integer);
}
});
accept: 200
accept: 300
defer()
操作符能使得每次訂閱操作都創建被觀察者,因此兩次訂閱操作會創建不同的被觀察者對象,因此兩次列印操作返回的值並不一樣
2.8、timer()
延遲指定時間後會發送一個大小為 0L
的值給觀察者
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) {
}
});
2.9、interval()
每隔一段時間就發送一個事件,傳遞的值從 0 開始並不斷增 1
Observable.interval(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) {
Log.e(TAG, "value is: " + aLong);
}
});
2.10、intervalRange()
可以指定發送事件的開始值和數量,其他與 interval()
的功能一樣
Observable.intervalRange(2, 3, 4, 5, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext:" + aLong);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
起始值從 2 開始遞增,事件共傳遞三次,第一次事件在訂閱後延遲 4 秒觸發,之後每次延遲 5 秒
10-06 10:48:40.017 17976-17976/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 10:48:44.017 17976-17990/leavesc.hello.rxjavademo E/MainActivity: onNext:2
10-06 10:48:49.017 17976-17990/leavesc.hello.rxjavademo E/MainActivity: onNext:3
10-06 10:48:54.017 17976-17990/leavesc.hello.rxjavademo E/MainActivity: onNext:4
10-06 10:48:54.017 17976-17990/leavesc.hello.rxjavademo E/MainActivity: onComplete
2.11、range()
發送指定範圍的事件序列
Observable.range(2, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.e(TAG, "values is :" + integer);
}
});
values is :2
values is :3
values is :4
values is :5
values is :6
2.12、rangeLong()
作用與 range()
一樣,只是數據類型是 Long
Observable.rangeLong((2, 5)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) {
Log.e(TAG, "values is :" + aLong);
}
});
2.13、empty() & never() & error()
empty()
直接發送 onComplete()
事件
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Object object) {
Log.e(TAG, "onNext: " + object);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
列印結果
onSubscribe
onComplete
換成 never()
onSubscribe
換成 error()
Observable.error(new Throwable("Hello")).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Object object) {
Log.e(TAG, "onNext: " + object);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
onSubscribe
onError: Hello
三、轉換操作符
3.1、map()
map()
用於將被觀察者發送的數據類型轉變成其他的類型
Observable.just(1, 2, 3)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return "I'm " + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, s);
}
});
10-06 10:53:16.364 18099-18099/leavesc.hello.rxjavademo E/MainActivity: I'm 1
10-06 10:53:16.364 18099-18099/leavesc.hello.rxjavademo E/MainActivity: I'm 2
10-06 10:53:16.364 18099-18099/leavesc.hello.rxjavademo E/MainActivity: I'm 3
3.2、flatMap()
用於將事件序列中的元素進行整合加工,返回一個新的被觀察者
List<List<String>> listArrayList = new ArrayList<>();
List<String> stringList = new ArrayList<>();
for (int j = 0; j < 2; j++) {
stringList.add("A_" + j);
}
listArrayList.add(stringList);
stringList = new ArrayList<>();
for (int j = 0; j < 2; j++) {
stringList.add("B_" + j);
}
listArrayList.add(stringList);
Observable.fromIterable(listArrayList).flatMap(new Function<List<String>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(List<String> stringList1) throws Exception {
return Observable.fromIterable(stringList1);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "value is: " + s);
}
});
10-06 11:02:47.246 18230-18230/leavesc.hello.rxjavademo E/MainActivity: value is: A_0
10-06 11:02:47.246 18230-18230/leavesc.hello.rxjavademo E/MainActivity: value is: A_1
10-06 11:02:47.246 18230-18230/leavesc.hello.rxjavademo E/MainActivity: value is: B_0
10-06 11:02:47.246 18230-18230/leavesc.hello.rxjavademo E/MainActivity: value is: B_1
3.3、concatMap()
concatMap()
和 flatMap()
基本一樣,只不過 concatMap()
轉發出來的事件是有序的,而 flatMap()
是無序的
還是用 flatMap()
的例子來看
Observable.fromIterable(listArrayList).flatMap(new Function<List<String>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(List<String> stringList1) throws Exception {
if (stringList1.get(0).startsWith("A")) {
return Observable.fromIterable(stringList1).delay(200, TimeUnit.MILLISECONDS);
}
return Observable.fromIterable(stringList1);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "value is: " + s);
}
});
進行了一次延時操作,可以看到兩次事件的發送順序顛倒了
10-06 11:07:30.753 18702-18702/leavesc.hello.rxjavademo E/MainActivity: value is: B_0
10-06 11:07:30.753 18702-18702/leavesc.hello.rxjavademo E/MainActivity: value is: B_1
10-06 11:07:30.953 18702-18716/leavesc.hello.rxjavademo E/MainActivity: value is: A_0
10-06 11:07:30.953 18702-18716/leavesc.hello.rxjavademo E/MainActivity: value is: A_1
使用 concatMap()
則順序將保持一致
3.4、buffer()
從需要發送的事件當中獲取指定數量的事件,並將這些事件放到緩衝區當中一併發出。buffer
有兩個參數,參數一count
用於指點緩衝區大小,參數二 skip
用指定當緩衝區滿了時,發送下一次事件序列的時候要跳過多少元素
Observable.just(1, 2, 3, 4, 5, 6)
.buffer(2, 2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(List<Integer> integers) {
Log.e(TAG, "緩衝區大小: " + integers.size());
for (Integer i : integers) {
Log.e(TAG, "元素: " + i);
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
onSubscribe
緩衝區大小: 2
元素: 1
元素: 2
緩衝區大小: 2
元素: 3
元素: 4
緩衝區大小: 2
元素: 5
元素: 6
onComplete
3.5、groupBy()
用於將數據進行分組,每個分組都會返回一個被觀察者。groupBy()
方法的返回值用於指定分組名,每返回一個新值就代表會創建一個分組
Observable.just(1, 2, 3, 4, 5, 6, 7)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
if (integer < 4) {
return "hello";
}
return "hi";
}
})
.subscribe(new Observer<GroupedObservable<String, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(final GroupedObservable<String, Integer> observable) {
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "GroupedObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "GroupedObservable onNext key :" + observable.getKey());
Log.e(TAG, "GroupedObservable onNext value :" + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "GroupedObservable onError");
}
@Override
public void onComplete() {
Log.e(TAG, "GroupedObservable onComplete");
}
});
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 11:16:35.616 19015-19015/? E/MainActivity: onSubscribe
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onSubscribe
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hello
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :1
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hello
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :2
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hello
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :3
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onSubscribe
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hi
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :4
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hi
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :5
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hi
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :6
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hi
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :7
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onComplete
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onComplete
10-06 11:16:35.616 19015-19015/? E/MainActivity: onComplete
3.6、scan()
scan()
操作符對原始 Observable
發射的第一條數據應用一個函數,然後將那個函數的結果作為自己的第一項數據發射。它將函數的結果同第二項數據一起填充給這個函數來產生它自己的第二項數據。它持續進行這個過程來產生剩餘的數據序列
Observable.just(1, 5, 8, 12).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) {
Log.e(TAG, "integer : " + integer);
Log.e(TAG, "integer2 : " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.e(TAG, "accept : " + integer);
}
});
10-06 11:25:19.389 19158-19158/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 11:25:19.389 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer : 1
10-06 11:25:19.389 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer2 : 5
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: accept : 6
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer : 6
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer2 : 8
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: accept : 14
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer : 14
10-06 11:25:19.409 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer2 : 12
10-06 11:25:19.409 19158-19158/leavesc.hello.rxjavademo E/MainActivity: accept : 26
四、組合操作符
4.1、concat() & concatArray()
用於將多個觀察者組合在一起,然後按照參數的傳入順序發送事件,concat()
最多只可以發送4個事件
Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8)).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: " + integer);
}
});
accept: 1
accept: 2
accept: 3
accept: 4
accept: 5
accept: 6
accept: 7
accept: 8
concatArray()
作用與 concat()
作用一樣,不過前者可以發送多於 4 個的被觀察者
4.2、merge() & mergeArray()
這個方法與 concat()
作用基本一樣,只是 concat()
是串列發送事件,而 merge()
並行發送事件
Observable.merge(Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) {
return "Test_A_" + aLong;
}
}),
Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) {
return "Test_B_" + aLong;
}
})).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, "accept: " + s);
}
});
Test_A_0
Test_B_0
Test_A_1
Test_B_1
Test_A_2
Test_B_2
Test_B_3
Test_A_3
Test_A_4
Test_B_4
Test_A_5
Test_B_5
mergeArray()
可以發送 4 個以上的被觀察者
4.3、concatArrayDelayError() & mergeArrayDelayError()
在 concatArray()
和 mergeArray()
兩個方法當中,如果其中有一個被觀察者發送了一個 Error
事件,那麼就會停止發送事件,如果想 onError()
事件延遲到所有被觀察者都發送完事件後再執行的話,可以使用 concatArrayDelayError()
和 mergeArrayDelayError()
首先使用 concatArray()
來驗證其發送 onError()
事件是否會中斷其他被觀察者的發送事件
Observable.concatArray(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Normal Exception"));
}
}), Observable.just(30, 40, 50)).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
}
});
onNext: 1
onNext: 2
onError: Normal Exception
從結果可以知道,確實中斷了,現在換用 concatArrayDelayError()
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 1
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 2
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 30
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 40
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 50
10-06 04:00:04.935 6514-6514/? E/MainActivity: onError: Normal Exception
從結果可以看到,onError
事件是在所有被觀察者發送完事件才發送的
4.4、zip()
zip()
操作符返回一個 Obversable
,它使用這個函數按順序結合兩個或多個 Observables 發射的數據項,然後它發射這個函數返回的結果。它按照嚴格的順序應用這個函數。它只發射與發射數據項最少的那個 Observable 一樣多的數據
Observable.zip(Observable.just(1, 2, 3, 4), Observable.just(5, 6, 7, 8, 9),
new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return String.valueOf(integer) + "_" + String.valueOf(integer2);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: " + s);
}
});
accept: 1_5
accept: 2_6
accept: 3_7
accept: 4_8
4.5、combineLatest() & combineLatestDelayError()
combineLatest()
的作用與 zip()
類似,combineLatest()
發送事件的序列是與發送的時間線有關的,當兩個 Observables
中的任何一個發射了一個數據時,通過一個指定的函數組合每個 Observable
發射的最新數據,然後發射這個函數的結果
Observable.zip(
Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) {
String s1 = "A" + aLong;
Log.e(TAG, "A 發送的事件 " + s1);
return s1;
}
}), Observable.intervalRange(1, 4, 2, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) {
String s1 = "B" + aLong;
Log.e(TAG, "B 發送的事件 " + s1);
return s1;
}
}),
new BiFunction<String, String, String>() {
@Override
public String apply(String value1, String value2) throws Exception {
return value1 + "_" + value2;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: " + s);
}
});
10-06 05:17:06.337 7227-7241/leavesc.hello.rxjavademo E/MainActivity: A 發送的事件 A1
10-06 05:17:07.337 7227-7241/leavesc.hello.rxjavademo E/MainActivity: A 發送的事件 A2
10-06 05:17:07.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: B 發送的事件 B1
10-06 05:17:07.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: accept: A1_B1
10-06 05:17:08.337 7227-7241/leavesc.hello.rxjavademo E/MainActivity: A 發送的事件 A3
10-06 05:17:08.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: B 發送的事件 B2
10-06 05:17:08.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: accept: A2_B2
10-06 05:17:09.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: B 發送的事件 B3
10-06 05:17:09.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: accept: A3_B3
10-06 05:17:09.337 7227-7241/leavesc.hello.rxjavademo E/MainActivity: A 發送的事件 A4
10-06 05:17:10.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: B 發送的事件 B4
10-06 05:17:10.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: accept: A4_B4
當發送 A1 和 A2 事件時,B 並沒有發送任何事件,所以不會觸發到 accept
方法。當發送了 B1 事件之後,就會與 A 最新發送的事件 A2 結合成 A1_B2,之後的發射規則也以此類推
combineLatestDelayError()
多了延遲發送 onError()
的功能
4.6、reduce()
與 scan()
操作符的作用類似,也是將發送數據以一定邏輯聚合起來,區別在於 scan()
每處理一次數據就會將事件發送給觀察者,而 reduce()
會將所有數據聚合在一起才會發送事件給觀察者
Observable.just(1, 3, 5, 7).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e(TAG, "integer1 : " + integer);
Log.e(TAG, "integer2 : " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
integer1 : 1
integer2 : 3
integer1 : 4
integer2 : 5
integer1 : 9
integer2 : 7
accept : 16
4.7、collect()
collect()
與 reduce()
類似,但它的目的是收集原始 Observable 發射的所有數據到一個可變的數據結構
Observable.just(1, 2, 3, 4)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
Log.e(TAG, "accept : " + integers);
}
});
accept : [1, 2, 3, 4]
4.8、startWith() & startWithArray()
在發送事件之前追加事件,startWith()
追加一個事件,startWithArray()
可以追加多個事件,追加的事件會先發出
Observable.just(4, 5)
.startWithArray(2, 3)
.startWith(1)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 4
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 5
4.9、count()
返回被觀察者發送事件的數量
Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "aLong : " + aLong);
}
});
aLong : 3
五、功能操作符
5.1、delay()
延遲一段事件再發送事件
Observable.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
Log.e(TAG, "value : " + value);
}
});
5.2、doOnEach()
Observable
發送一次事件之前都會回調這個方法
Observable.just(1, 2, 3)
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.e(TAG, "integerNotification value : " + integerNotification.getValue());
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
Log.e(TAG, "accept : " + value);
}
});
10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 1
10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 1
10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 2
10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 2
10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 3
10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 3
10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : null
5.3、doOnNext()
Observable
發送 onNext()
之前都會先回調這個方法
Observable.just(1, 2, 3)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "doOnNext accept : " + integer);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
Log.e(TAG, "accept : " + value);
}
});
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 1
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 2
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 3
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 3
5.4、doAfterNext()
Observable
發送 onNext()
之後都會回調這個方法
Observable.just(1, 2, 3)
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "doOnNext accept : " + integer);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
Log.e(TAG, "accept : " + value);
}
});
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 1
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 2
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 3
5.5、doOnComplete()
Observable
調用 onComplete()
之前都會回調這個方法
Observable.just(1, 2, 3)
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete run()");
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
Log.e(TAG, "accept : " + value);
}
});
10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: doOnComplete run()
5.6、doOnError()
Observable
發送 onError()
之前都會回調這個方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Normal Exception"));
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "doOnError accept() : " + throwable.getMessage());
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
}
});
10-06 06:14:17.894 9230-9230/? E/MainActivity: onNext : 1
10-06 06:14:17.894 9230-9230/? E/MainActivity: onNext : 2
10-06 06:14:17.894 9230-9230/? E/MainActivity: doOnError accept() : Normal Exception
10-06 06:14:17.894 9230-9230/? E/MainActivity: onError : Normal Exception
5.7、doOnSubscribe()
Observable
發送 onSubscribe()
之前會回調這個方法
5.8、doOnDispose()
當調用 Disposable
的 dispose()
之後會回調該方法
5.9、doOnLifecycle()
在回調 onSubscribe
之前回調該方法的第一個參數的回調方法,可以使用該回調方法決定是否取消訂閱,doOnLifecycle()
第二個參數的回調方法的作用與 doOnDispose()
一樣
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnLifecycle accept");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnLifecycle run");
}
}).subscribe(new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
this.disposable = d;
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
disposable.dispose();
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: doOnLifecycle accept
10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: doOnLifecycle run
5.10、doOnTerminate() & doAfterTerminate()
doOnTerminate
是在 onError
或者 onComplete
發送之前回調,而 doAfterTerminate
則是 onError
或者 onComplete
發送之後回調
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnTerminate run");
}
}).doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate run");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 06:34:55.968 9713-9713/? E/MainActivity: onSubscribe
10-06 06:34:55.968 9713-9713/? E/MainActivity: onNext : 1
10-06 06:34:55.968 9713-9713/? E/MainActivity: onNext : 2
10-06 06:34:55.968 9713-9713/? E/MainActivity: doOnTerminate run
10-06 06:34:55.968 9713-9713/? E/MainActivity: onComplete
10-06 06:34:55.968 9713-9713/? E/MainActivity: doAfterTerminate run
5.11、doFinally()
在所有事件發送完畢之後回調該方法。 doFinally()
和 doAfterTerminate()
的區別在於取消訂閱時,如果取消訂閱,之後 doAfterTerminate()
就不會被回調,而 doFinally()
無論怎麼樣都會被回調,且都會在事件序列的最後
5.12、onErrorReturn()
當接受到一個 onError()
事件之後回調,返回的值會回調 onNext()
方法,並正常結束該事件序列
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Normal Exception"));
}
}).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return 7;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 06:43:13.712 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 7
10-06 06:43:13.712 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onComplete
5.13、onErrorResumeNext()
當接收到 onError()
事件時,返回一個新的 Observable
,並正常結束事件序列
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Normal Exception"));
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
Log.e(TAG, "onErrorResumeNext apply: " + throwable.getMessage());
return Observable.just(4, 5, 6);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onErrorResumeNext apply: Normal Exception
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 4
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 5
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 6
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onComplete
5.14、 onExceptionResumeNext()
與 onErrorResumeNext()
作用基本一致,但是這個方法只能捕捉 Exception
,不能捕獲 Error
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Normal Exception"));
}
}).onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
Log.e(TAG, "onExceptionResumeNext subscribeActual");
observer.onNext(3);
observer.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onExceptionResumeNext subscribeActual
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 3
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onComplete
將 emitter.onError(new Exception("Normal Exception"))
改為 emitter.onError(new Error("Normal Exception"));
異常將不會被捕獲
10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onError : Normal Exception
5.15、retry()
如果出現錯誤事件,則會重新發送所有事件序列指定次數
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Error("Normal Exception"));
}
}).retry(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 06:55:17.273 10591-10591/? E/MainActivity: onSubscribe
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2
10-06 06:55:17.273 10591-10591/? E/MainActivity: onError : Normal Exception
5.16、retryUntil()
出現錯誤事件之後,可以通過此方法判斷是否繼續發送事件
private int index = 1;
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Normal Exception"));
}
}).retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
Log.e(TAG, "getAsBoolean");
return index == 7;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
index++;
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onError : Normal Exception
5.17、repeat()
以指定次數重覆發送被觀察者的事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).repeat(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 07:38:47.680 12155-12155/? E/MainActivity: onSubscribe
10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 1
10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 2
10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 1
10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 2
10-06 07:38:47.690 12155-12155/? E/MainActivity: onComplete
5.18、repeatWhen()
返回一個新的被觀察者來決定是否重覆發送事件。如果新的被觀察者返回 onComplete
或者 onError
事件,則舊的被觀察者不會發送事件。如果新的被觀察者返回其他事件,則舊的觀察者會發送事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
// return Observable.empty();
// return Observable.error(new Exception("Normal Exception"));
// return Observable.just(1);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext : " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
以上三種情況的輸出結果分別是
10-06 14:29:05.641 20921-20921/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 14:29:05.641 20921-20921/leavesc.hello.rxjavademo E/MainActivity: onComplete
10-06 14:29:36.150 21027-21027/?