更多文章請點擊鏈接:http://77blogs.com/?p=162 轉載請標明出處:https://www.cnblogs.com/tangZH/p/12088300.html,http://77blogs.com/?p=162 RxJava究竟是啥,從根本上來講,它就是一個實現非同步操作的庫,並 ...
更多文章請點擊鏈接:http://77blogs.com/?p=162
轉載請標明出處:https://www.cnblogs.com/tangZH/p/12088300.html,http://77blogs.com/?p=162
RxJava究竟是啥,從根本上來講,它就是一個實現非同步操作的庫,並且能夠使代碼非常簡潔。它的非同步是使用觀察者模式來實現的。
關於觀察者模式的介紹,可以看我這一篇文章:
https://www.cnblogs.com/tangZH/p/11175120.html
這裡我主要講RxJava的一些基本用法,基本案例,原理的話暫時不深究:
一、自己構造事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter emitter) {
int i = getNumber();
if (i < 0) {
emitter.onComplete();
return;
} else {
Log.d(TAG, Thread.currentThread().getName());
emitter.onNext(i);
emitter.onComplete();
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, Thread.currentThread().getName());
Log.d(TAG, integer + "");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
RxJava 有四個基本概念:Observable
(可觀察者,即被觀察者)、 Observer
(觀察者)、 subscribe
(訂閱)、事件。Observable
和 Observer
通過 subscribe()
方法實現訂閱關係,從而 Observable
可以在需要的時候發出事件來通知 Observer
。
onNext():方法用來發送事件。
下麵看看其他兩個方法:
onCompleted()
: 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的onNext()
發出時,需要觸發onCompleted()
方法作為標誌。onError()
: 事件隊列異常。在事件處理過程中出異常時,onError()
會被觸發,同時隊列自動終止,不允許再有事件發出。- 在一個正確運行的事件序列中,
onCompleted()
和onError()
有且只有一個,並且是事件序列中的最後一個。需要註意的是,onCompleted()
和onError()
二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
講一下我們上面的例子,上面這個例子是採用簡潔的鏈式調用來寫的:
首先使用 create()
方法來創建一個 Observable ,併為它定義事件觸發規則,然後通過emitter.onNext(i)傳遞出來,.subscribeOn(Schedulers.io())便是指定該事件產生的所在的線程為子線程,.observeOn(AndroidSchedulers.mainThread())指定觀察者執行的線程為主線程。這時候為止返回的對象為Observable對象。
然後該Observable對象subscribe綁定觀察者(也就是觀察者進行訂閱),裡面有接收被觀察者發出來的事件,有一個成功的方法,和一個失敗的方法,這樣就實現了由被觀察者向觀察傳遞事件。
二、對集合里的數據進行變換
List<Integer> list = new ArrayList<Integer>() {
{
add(0);
add(1);
add(2);
}
};
Observable.fromIterable(list).map(new Function() {
@Override
public Object apply(Object o) throws Exception {
int i = (int) o + 1;
return String.valueOf(i);
}
})
.toList()
.toObservable().subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, o.toString());
}
});
且看,我們需要對某個集合裡面的數據一一進行變換,然後發送出來執行其他操作。
上面便是對集合裡面的每一項進行加一操作,然後再轉換為String類型,然後toList(),組合成集合發送出來,最後在觀察者方法中列印出每一項。
三、合併執行
定義兩個被觀察者,各自產生事件,然後合併在一起,發送給一個觀察者。
首先定義我們上面第一個例子的被觀察者,用於發送一個數字:
Observable observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter emitter) {
int i = getNumber();
if (i < 0) {
emitter.onComplete();
return;
} else {
Log.d(TAG, Thread.currentThread().getName());
emitter.onNext(i);
emitter.onComplete();
}
}
})
.subscribeOn(Schedulers.io());
其次再定義我們上面第二個例子的被觀察者:
List<Integer> list = new ArrayList<Integer>() {
{
add(0);
add(1);
add(2);
}
};
Observable observable2 = Observable.fromIterable(list).map(new Function() {
@Override
public Object apply(Object o) {
int i = (int) o + 1;
return String.valueOf(i);
}
})
.toList()
.toObservable().subscribeOn(Schedulers.io());
最後將這兩個被觀察者的事件合併起來發送給一個觀察者:
Disposable disposable = Observable.zip(observable1, observable2, new BiFunction() {
@Override
public Object apply(Object o, Object o2) throws Exception {
int i = (int) o;
String k = (String) ((List) o2).get(0);
return k + i;
}
})
.subscribe(new Consumer() {
@Override
public void accept(Object o) {
Log.d(TAG, (String) o);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
Log.d(TAG, throwable.getMessage());
}
});
zip方法,顧名思義,有點類似與於打包的意思。
o為被觀察者1返回的結果,o2為被觀察2返回的結果,將這兩個結果一起處理後發送給觀察者。列印出來。
現在先介紹這幾個,找個時間再整理一些其他的用法以及原理實現。