觀察者模式的概念 觀察者A與被觀察者B建立訂閱關係,當被觀察者B發生某種改變時,立即通知觀察者A 添加依賴 基本模式 Observable被觀察者 註意各地方添加泛型避免大片警告,onNext()是事件的回調,onComplete()是事件的結尾。onComplete()與onError互斥需要保持 ...
觀察者模式的概念
觀察者A與被觀察者B建立訂閱關係,當被觀察者B發生某種改變時,立即通知觀察者A
添加依賴
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
基本模式
Observable被觀察者
註意各地方添加泛型避免大片警告,onNext()是事件的回調,onComplete()是事件的結尾。onComplete()與onError互斥需要保持唯一性,並只能調用一次。
Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("消息1"); e.onNext("消息2"); e.onNext("消息3"); e.onComplete(); } });
Observer觀察者
創建觀察者時回調的onSubscribe可以獲取Disposable對象,在合適的時候判斷條件,調用dispose()即可接觸訂閱關係
Observer<String> observer=new Observer<String>() { @Override public void onSubscribe(Disposable d) { //通過判斷解除訂閱關係 d.dispose(); } @Override public void onNext(String o) { //對應observable的onNext方法 } @Override public void onError(Throwable e) { //對應observable的onError方法 } @Override public void onComplete() { //對應observable的onComplete方法 } };
建立訂閱關係
observable.subscribeOn(Schedulers.io()) //指定事件生產在子線程 .observeOn(AndroidSchedulers.mainThread()) //指定事件消費在UI線程 .subscribe(observer);
Observable被觀察者的其他模式
//just模式,將自動發送onNext()事件 Observable<String> observable = Observable.just("發送消息"); //fromIterable模式,遍歷集合,並自動發送onNext()事件 Observable<String> observable = Observable.fromIterable((Iterable<String>) mList); //interval模式,定時自動發送整數序列,從0開始每隔2秒計數, Observable<Long> observable = Observable.interval(0,2, TimeUnit.SECONDS) //range模式,自動發送特定的整數序列,0表示不發送,負數會拋異常,從1開始發送到20 Observable<Integer> observable = Observable.range(1,20); //timer模式,定時執行觀察者的onNext()方法 Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);
Observable被觀察者的更多創建方式以及操作符
如創建操作,數據過濾操作,條件操作,轉載以下博客,很詳細:
RxJava操作符大全
Scheduler調度器
四種常見模式
Schedulers.immediate() 預設模式,在當前線程運行
Schedulers.newThread() 創建新的子線程運行
Schedulers.io() 創建新的子線程運行,內部使用的是無上限的線程池,可重用空閑的線程,效率高
AndroidSchedulers.mainThread() 在UI主線程運行
訂閱事件時的生產與消費線程
subscribeOn() 指定Observable(被觀察者)所在的線程,或者叫做事件產生的線程
observeOn() 指定 Observer(觀察者)所運行在的線程,或者叫做事件消費的線程
新的觀察者模式
Flowable被觀察者
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { e.onNext("hello RxJava!"); e.onComplete(); } },BackpressureStrategy.BUFFER);//增加背壓模式
Subscriber觀察者
onSubscribe()會返回Subscription對象,調用cancel()即可取消訂閱關係,request()即可指定消費事件的數量
Subscriber<String> subscriber=new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(String s) { Log.i("RxJava", "onNext: "+s); } @Override public void onError(Throwable t) { Log.i("RxJava", "onError"); } @Override public void onComplete() { Log.i("RxJava", "onComplete"); } }; flowable.subscribe(subscriber);//建立訂閱關係
Backpressure背壓模式
如果生產者和消費者不在同一線程的情況下,如果生產者的速度大於消費者的速度,就會產生Backpressure問題。即非同步情況下,Backpressure問題才會存在。
BUFFER
所謂BUFFER就是把RxJava中預設的只能存128個事件的緩存池換成一個大的緩存池,支持存很多很多的數據。這樣,消費者通過request()即使傳入一個很大的數字,生產者也會生產事件,並將處理不了的事件緩存。
但是這種方式任然比較消耗記憶體,除非是我們比較瞭解消費者的消費能力,能夠把握具體情況,不會產生OOM。
DROP
當消費者處理不了事件,就丟棄。
消費者通過request()傳入其需求n,然後生產者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉
LATEST
LATEST與DROP功能基本一致,唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最後一個事件
ERROR
這種方式會在產生Backpressure問題的時候直接拋出一個異常,這個異常就是著名的MissingBackpressureException