RxJava2已經推出有一年半的時間,由於之前RxJava已經在現有項目中廣泛使用,而RxJava2在除了很多命名外並沒有太多革新,所以相信有很多人跟我一樣都還沒有升級. 隨著老版本漸漸的失去維護,更重要的是有一定時間允許我來做這個遷移,其實棄老從新一直都是程式員的喜好. ...
如何從RxJava升級到RxJava2.
RxJava2已經推出有一年半的時間,由於之前RxJava已經在現有項目中廣泛使用,而RxJava2在除了很多命名外並沒有太多革新,所以相信有很多人跟我一樣都還沒有升級.
隨著老版本漸漸的失去維護,更重要的是有一定時間允許我來做這個遷移,其實棄老從新一直都是程式員的喜好.
雖然官方提供了文檔詳盡的介紹了區別,但是文章之長,可能很多人讀不下去,卻有想快速的遷移過來,我將除了命名改變之外有用的地方總結成了幾點,供大家參考.
不能再發射Null了
RxJava2的最大改變就是不能再流里發射Null了,有人會問發射了就怎麼了,答案是你的流會因為NPE
斷開.
比如以前我們會寫出這樣的代碼(詳見RxPermission):
Observable.just(null).compose....
在RxJava二中我們需要將它改為(詳見RxPermission2):
TRIGGER = new Object()
Observer.just(TRIGGER).compose(xxx)
還有我們常常完成某個工作而不需要返回值,或者根本不關心返回值,將返回的Observable定義為Observable
xxx.flatMap {
....
return null;
};
現在不能這麼寫了,對於不需要返回值的,我們應該使用Completable,當然這個在RxJava的時候也已經存在了.
xxx.flatMapCompletable { Completable.fromAction{ } }
還有我們在實現Local Cache與Remote Cache的時候常用的辦法:
localObservable = just(localReference);
concat(localObservable, remoteObservable).filter{ i != null }.first()...
會因為在沒有Local Cache的時候出錯,所以應該改成:
localObservable = just(Optional.fromNullable(localReference));
concat(localObservable, remoteObservable).filter{ i.isPresent() }.firstElement()/.first(defaultValue)...
flatMap方法多了
在上面的介紹中可能已經發現了,老版本只有同類型的flatMap,即Observable <-> Observable, Single <-> Single, 而RxJava2除了同類型的flatMap,還增添了flatMapCompletable,flatMapSingle,flatMapObservable幫助你任意切換.
訂閱與反訂閱
我們有時候需要在必要的時刻手動的將訂閱取消,而防止產生我們不想要的問題,如在跳出定位頁面時取消訂閱,防止位置信息後面回來造成程式崩潰.
而在RxJava中,我們一般是這麼做的:
Subscription subscription = xxxx.subscribe(xxxSubscriber);
subscription.unsubscribe();
在RxJava2中,這個發生了變化,因為你會發現subscribe
方法基本上都返回void的,如果你需要手動取消的話,需要使用T subscribeWith(T extends Disposal)
方法.
其實我們可以看到,新版的Subscriber
或者Observer
都多了一個方法void onSubscribe(Subscription s)
或者void onSubscribe(Disposable d)
, 也就是說以前的Subscription
是通過訂閱後通過回調返回了.
這裡RxJava2統一介面到Disposable
,提供dispose
方法進行反訂閱,並且還提供了DisposableObservable
,DisposableSingle
,DisposableCompletable
已經幫我們處理了回調返回的Disposable對象.
所以需要做的改動不大:
Disposable disposable = xxx.subscribeWith(xxxDisposableObserver);
disposable.dispose();
錯誤處理
錯誤處理最棒的一點是之前必須實現onError來handle錯誤,如果不實現,就會拋出OnErrorNotImplement
,導致程式崩潰,根據最新的Doc,在RxJava2中,可以輕鬆Handle未處理的錯誤.
RxJavaPlugins.setErrorHandler(xxx);
還有一點變化需要註意是的是,當你有並行任務的時候,如果一個線程出錯,將會導致整個流中斷,其他線程可能會拋出IOInterupedException
並且onError無法Handle,這時候必須有上面講到的ErrorHandler來處理這一類UnDeliveriedException
,否則程式會Crash.
Flowable
RxJava2將處理背壓(BackPressure)的部分抽出來弄了一個新的對象,叫做Flowable
.
以前我們處理背壓可能直接通過
xxx.onBackpressureXXXStrategy()...
就可以了.
現在我們得通過Flowable來處理.
xxx.toFlowable(XXXStrategy)...
當然Flowable還提供比較強大的新方法,來處理併發.
比如之前我們需要實現併發,得通過flatMap來實現.
Observable.from(urls).flatMap {
v -> Observable.just(v).subscribeOn(io()).....
}.subscribe(...)
使用Flowable,可以簡化為:
Flowable.fromIterable(listingIds)
.parallel().runOn(io())
.map { v -> xxx }
.sequential()
看起來是不是有點炫酷...
測試
對於RxJava2,任何一個Observable都可以轉化為一個TestObservable
, 通過...test()
來進行轉換.
而TestObservable提供很多與測試相關的方法,就不用我們親自去判斷.
如assertResult
,assertError
,assertSubscribed
.
其他改動
關於名字的變化,這裡都不一一論述,包含Func1
-> Function
, Action
-> Consumer
, Observable.Transformer
-> ObservableTransformer
等等.