零、來源 來源:Carson_Ho-簡書 一、基礎知識 角色 作用 類比 被觀察者(Observable) 產生事件 顧客 觀察者(Observer) 接收事件,並給出響應動作 廚房 訂閱(Subscribe) 連接 被觀察者 & 觀察者 服務員 事件(Event) 被觀察者 & 觀察者 溝通的載體 ...
零、來源
來源:Carson_Ho-簡書
一、基礎知識
角色 | 作用 | 類比 |
---|---|---|
被觀察者(Observable) | 產生事件 | 顧客 |
觀察者(Observer) | 接收事件,並給出響應動作 | 廚房 |
訂閱(Subscribe) | 連接 被觀察者 & 觀察者 | 服務員 |
事件(Event) | 被觀察者 & 觀察者 溝通的載體 | 菜式 |
二、基礎使用
1.導入連接
implementation 'io.reactivex.rxjava2:rxjava:2.2.19'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
2.創建被觀察者
//創建被觀察者,產生事件
public Observable<Integer> createObservable() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
return observable;
}
3.創建觀察者
//創建觀察者
public Observer<Integer> createObserver() {
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "onSubscribe 連接");
}
@Override
public void onNext(Integer value) {
Log.v("lanjiabinRx", "onNext " + value + " 事件");
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "onError 事件");
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "onComplete 事件");
}
};
return observer;
}
4.建立subscribe()連接
//觀察者訂閱被觀察者
public void createSubscribe() {
createObservable().subscribe(createObserver());
}
5.調用和結果
createSubscribe();
6.鏈式調用
//鏈式調用
public void chainCall() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "onSubscribe 連接");
}
@Override
public void onNext(Integer value) {
Log.v("lanjiabinRx", "onNext " + value + " 事件");
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "onError 事件");
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "onComplete 事件");
}
});
}
6.切斷連接
即觀察者 無法繼續 接收 被觀察者的事件,但被觀察者還是可以繼續發送事件
Disposable mDisposable; //1.定義
@Override
public void onSubscribe(Disposable d) {
mDisposable=d; //2.賦值
Log.v("lanjiabinRx", "onSubscribe 連接");
}
@Override
public void onNext(Integer value) {
if (value==2) mDisposable.dispose(); //3.在第二個next事件斷開連接
Log.v("lanjiabinRx", "onNext " + value + " 事件");
}
三、創建操作符
0.總圖
1. create (基礎發送)
最基礎的創建
//1.create
public void chainCall() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "onSubscribe 連接");
}
@Override
public void onNext(Integer value) {
Log.v("lanjiabinRx", "onNext " + value + " 事件");
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "onError 事件");
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "onComplete 事件");
}
});
}
2. just (立刻發送10以下)
- 快速創建1個被觀察者對象(Observable)
- 發送事件的特點:直接發送傳入的事件
- 最多只能發送十個參數
- 應用場景:快速創建 被觀察者對象(Observable) & 發送10個以下事件
//2.just
public void justDo() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Integer integer) {
Log.d("lanjiabinRx", "接受的事件 onNext =" + integer);
}
@Override
public void onError(Throwable e) {
Log.d("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.d("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:
3. fromArray (數組發送)
- 快速創建1個被觀察者對象(Observable)
- 發送事件的特點:直接發送 傳入的數組數據
- 會將數組中的數據轉換為Observable對象
應用場景:
1.快速創建 被觀察者對象(Observable) & 發送10個以上事件(數組形式)
2.數組元素遍歷
//3.fromArray
public void fromArrayDo() {
Integer[] items = {0, 1, 2, 3, 4};
Observable.fromArray(items).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Integer integer) {
Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:
4. fromIterable (集合發送)
- 快速創建1個被觀察者對象(Observable)
- 發送事件的特點:直接發送 傳入的集合List數據
- 會將數組中的數據轉換為Observable對象
應用場景:
1.快速創建 被觀察者對象(Observable) & 發送10個以上事件(集合形式)
2.集合元素遍歷
//4.fromIterable
public void fromIterableDo(){
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Integer integer) {
Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:
// 下列方法一般用於測試使用
<-- empty() -->
// 該方法創建的被觀察者對象發送事件的特點:僅發送Complete事件,直接通知完成
Observable observable1=Observable.empty();
// 即觀察者接收後會直接調用onCompleted()
<-- error() -->
// 該方法創建的被觀察者對象發送事件的特點:僅發送Error事件,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收後會直接調用onError()
<-- never() -->
// 該方法創建的被觀察者對象發送事件的特點:不發送任何事件
Observable observable3=Observable.never();
// 即觀察者接收後什麼都不調用
5. defer (獲取最新數據)
- 直到有觀察者(Observer )訂閱時,才動態創建被觀察者對象(Observable) & 發送事件
- 通過 Observable工廠方法創建被觀察者對象(Observable)
- 每次訂閱後,都會得到一個剛創建的最新的Observable對象,這可以確保Observable對象里的數據是最新的
應用場景:
動態創建被觀察者對象(Observable) & 獲取最新的Observable對象數據
//5.defer
Integer i = 10; //第一次賦值
public void deferDo() {
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
i = 15; //第二次賦值
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Integer integer) {
Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:得到最新賦值的數字,說明取到了最新的數據
6. timer (延遲發送)
- 快速創建1個被觀察者對象(Observable)
- 發送事件的特點:延遲指定時間後,發送1個數值0(Long類型)
- 本質 = 延遲指定時間後,調用一次 onNext(0)
應用場景:
延遲指定事件,發送一個0,一般用於檢測
//6.timer
public void timerDo() {
// 註:timer操作符預設運行在一個新線程上
// 也可自定義線程調度器(第3個參數):timer(long,TimeUnit,Scheduler)
//TimeUnit.SECONDS延遲2s後,發送一個0
/**
* timer(long delay, TimeUnit unit)
* delay 數值
* unit 單位
* 下麵就是 2數值,單位為秒,所以是2秒
* */
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Long aLong) {
/*
* 得到的結果為0 一般用於檢測
* */
Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:
7. interval (周期發送,無限)
- 快速創建1個被觀察者對象(Observable)
- 發送事件的特點:每隔指定時間 就發送 事件
- 發送的事件序列 = 從0開始、無限遞增1的的整數序列
//7.interval
public void intervalDo() {
/**
* 從0開始遞增
*
* @param initialDelay (Long)
* 初始延遲時間(第一次延遲時間)
* @param period (Long)
* 後續數字發射之間的時間間隔(一個周期時間)
* @param unit
* 時間單位
* */
Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Long aLong) {
Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:
8. intervalRange (周期發送,有限,指定數據)
- 作用類似於interval(),但可指定發送的數據的數量
//8.intervalRange
public void intervalRangeDo() {
/**
*
* @param start 起始值
* @param count 總共要發送的值的數量,如果為零,則運算符將在初始延遲後發出onComplete
* @param initialDelay 發出第一個值(開始)之前的初始延遲
* @param period 後續值之間的時間段
* @param unit 時間單位
* */
Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Long aLong) {
Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:3-12 經歷10個數
9. range (無延遲,Integer類型指定數據)
- 作用類似於intervalRange(),但區別在於:無延遲發送事件
//9.range
public void rangeDo(){
/**
* @param start
* 序列中第一個Integer的值
* @param count
* 要生成的順序整數的數量
* */
Observable.range(3,5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Integer integer) {
Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:
10. rangeLong (無延遲,Long類型指定數據)
//10.rangeLong
public void rangeLongDo(){
/**
* @param start
* Long類型,序列中第一個Integer的值
* @param count
* Long類型,要生成的順序整數的數量
* */
Observable.rangeLong(3,8).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("lanjiabinRx", "開始採用subscribe連接");
}
@Override
public void onNext(Long aLong) {
Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
}
@Override
public void onError(Throwable e) {
Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
}
@Override
public void onComplete() {
Log.v("lanjiabinRx", "接受的事件 onComplete");
}
});
}
結果:
編程中我們會遇到多少挫折?表放棄,沙漠盡頭必是綠洲。