RxJava的核心內容很簡單,就是進行非同步操作。類似於Handler和AsyncTask的功能,但是在代碼結構上不同。 RxJava使用了觀察者模式和建造者模式中的鏈式調用(類似於C#的LINQ)。 觀察者模式:Observable(被觀察者)被Observer(觀察者)訂閱(Subscribe)之 ...
RxJava的核心內容很簡單,就是進行非同步操作。類似於Handler和AsyncTask的功能,但是在代碼結構上不同。
RxJava使用了觀察者模式和建造者模式中的鏈式調用(類似於C#的LINQ)。
觀察者模式:Observable(被觀察者)被Observer(觀察者)訂閱(Subscribe)之後,Observable在發出消息的時候會通知對應的Observer,並且,一個Observable可以有被多個Observer訂閱。
鏈式調用:和Builder模式類似,調用對應的方法對原對象進行處理後返回原對象,從而做到鏈式調用。
這裡介紹一些下麵會用到的名詞:
- Observable:被觀察者,也就是消息的發送者
- Observer:觀察者,消息的接收者
- Subscriber:訂閱者,觀察者的另一種表示
- Scheduler:調度器,進行線程切換
先從簡單入手模擬一下這個過程:
先配置依賴(版本為當前最新):
dependencies { ... compile 'io.reactivex:rxjava:1.1.8' compile 'io.reactivex:rxandroid:1.2.1' }
接著在工程的onCreate方法中直接加入如下代碼:
1 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { 2 @Override 3 public void call(Subscriber<? super String> subscriber) { 4 subscriber.onNext("hello"); 5 subscriber.onNext("android"); 6 subscriber.onCompleted(); 7 } 8 }); 9 10 Subscriber<String> subscriber = new Subscriber<String>() { 11 @Override 12 public void onCompleted() { 13 Log.d(TAG, "onCompleted() called"); 14 } 15 16 @Override 17 public void onError(Throwable e) { 18 Log.d(TAG, "onError() called with: e = [" + e + "]"); 19 } 20 21 @Override 22 public void onNext(String s) { 23 Log.d(TAG, "onNext() called with: s = [" + s + "]"); 24 } 25 }; 26 27 observable.subscribe(subscriber);
1-8行創建了一個Observable對象,這個對象可以通過create方法來創建,傳入一個OnSubscribe的實現類,併在其onCall方法中調用subscriber的onNext方法進行消息的發送,分別發送兩個字元串然後調用onComplete方法表示發送完成。
10-25行創建了一個Subscriber(訂閱者)對象,這個類是Observer的子類,Observer是傳統的觀察者,而Subscriber則豐富了Observer,Subsciber添加了兩個方法,分別是:
- onStart:在Subscriber和Observable連接之後,消息發送之前調用,也就是先於onNext方法調用
- unsubscirbe:解除訂閱關係,Subscriber不再收到從Observable發送的消息
如果不需要用到這兩個方法,直接創建Observer也是可以的,創建的方法類似Subscriber。
27行調用了Observable的subscribe方法對Subscriber進行綁定。
運行程式會列印如下的Log:
最簡單的流程就是這樣,如果上面代碼中的局部變數不是必須的話,代碼可以變成如下所示:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); subscriber.onNext("android"); subscriber.onCompleted(); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted() called"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError() called with: e = [" + e + "]"); } @Override public void onNext(String s) { Log.d(TAG, "onNext() called with: s = [" + s + "]"); } });
效果是完全一樣的,相信可以看得明白,後面這種形式會比較常見。
接著要介紹兩個類——Actionx和Funcx(這兩個類最後的x為數字)。
Actionx:指的是一組操作(用於簡化Subscriber),而這組操作有x個參數,並且返回Void
Funcx:指的是一組方法(用於對消息進行處理,下麵的變換會使用到),這個方法有x個參數和1個返回值
這裡還是和上面例子一樣,假設我們只關心Subscriber的onNext方法,而不關心onCompleted和onError方法,那麼這一個Subscriber就可以被替換為一個Action1,代碼如下:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); subscriber.onNext("android"); subscriber.onCompleted(); } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.d(TAG, "call() called with: s = [" + s + "]"); } });
如果我們只關心onNext和onError,則可以用兩個Action1來表示,因為subscribe的重載允許我們這麼做,可以自行嘗試。
因為RxJava的內容其實比較多,這裡就對其中一些進行介紹,有興趣的可以轉到文章最後的地方會給出一些參考文章。
① 創建操作
常見的創建操作有幾種:Create、From、Just和Interval(還有其他可以參考文末文章)
上面的例子中直接使用了create方法進行Observable的創建,下麵可以通過just來進行簡單的創建,代碼如下:
1 Observable.just("hello", "world").subscribe(new Action1<String>() { 2 @Override 3 public void call(String s) { 4 Log.d(TAG, "call() called with: s = [" + s + "]"); 5 } 6 });
和例子中的效果是一樣的,但是這裡要註意了,just方法可以傳入任何類型的對象,但是必須和Action1或者Subscriber所指定的泛型一致。
也就是假設我們要列印的不是字元串而是數字,代碼變成如下所示:
1 Observable.just(1, 2).subscribe(new Action1<Integer>() { 2 @Override 3 public void call(Integer s) { 4 Log.d(TAG, "call() called with: s = [" + s + "]"); 5 } 6 });
※ Action1所指定的泛型由String改為Integer,call方法的參數也相應的更改。
from方法也可以創建Observable,用法可以自行嘗試。
這裡說一下Interval方法,Android開發中可能會用到。Interval是定時器,每隔一段時間發送一個消息,這個消息由一個Long類型的值表示,從0開始。代碼:
1 Observable.interval(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() { 2 @Override 3 public void call(Long aLong) { 4 Log.d(TAG, "call() called with: s = [" + aLong + "]"); 5 } 6 });
運行代碼之後,Logcat會每隔一秒列印一句Log,而aLong的值會從0開始每次遞增1。interval方法的參數分別是:間隔值、間隔單位,上面代碼指的是每隔1s觸發一次。
光有計時器還不行,我們怎麼讓它自動停止呢,比如說列印5次。這裡涉及到RxJava的過濾問題,就是使用take方法並傳入發送次數,當發送次數大於這個值的時候,其他的消息都被過濾了,我們的Subscriber不再接收。代碼如下:
1 Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Action1<Long>() { 2 @Override 3 public void call(Long aLong) { 4 Log.d(TAG, "call() called with: s = [" + aLong + "]"); 5 } 6 });
※ 這裡的take方法不能在subscribe方法後調用,因為subscribe方法返回值不再是Observable
當然,take方法還有其他重載,可以自行嘗試。
② 變換操作
變換操作主要是對Observable發送的消息進行變換,使得在Subscriber中處理起來更加簡單。就通過上面的例子來說明吧,我們設定了的計時器中,每次返回給Subscriber(Action1)的值是Long類型的,假設我們要在接收端得到一個Integer類型的值,怎麼辦呢?就要使用變換,最常用的變換就是map。
1 Observable.interval(1, TimeUnit.SECONDS).take(5).map(new Func1<Long, Integer>() { 2 @Override 3 public Integer call(Long aLong) { 4 return aLong.intValue(); 5 } 6 }).subscribe(new Action1<Integer>() { 7 @Override 8 public void call(Integer integer) { 9 Log.d(TAG, "call() called with: s = [" + integer + "]"); 10 } 11 });
可以看到這個時候Action1使用的泛型變為Integer而不是Long,因為我們在subscribe方法之前調用了map方法對消息進行變換,可以看到,變換需要傳入一個Funcx的對象,並實現一個call方法將Long類型的數據轉換為Integer類型。希望你還記得Func1表示含有一個參數和一個返回值。
接著再說一個比較常用的,名字是flatmap。它的作用,是把一個數據類型變成一個Observable對象。也就是說Observable裡面包含Observable,而新建出來的Observable中的消息會交付給原來的Observable進行發送。如果不明白,可以看例子:
假設有兩個類,一個是學生(姓名、選課),一個是課程(分數),學生類中的選課是數組類型,要求是列印所有學生的每個選課成績:
private class Student{ String name; Course[] courses; Student(String name, Course[] courses) { this.name = name; this.courses = courses; } } private class Course{ int score; private Course(int score) {this.score = score;} }
如果我們上面的做法,代碼如下:
1 Student s1 = new Student("Jack", new Course[]{new Course(80), new Course(79)}); 2 Student s2 = new Student("Rose", new Course[]{new Course(87), new Course(69)}); 3 4 Observable.just(s1, s2).subscribe(new Action1<Student>() { 5 @Override 6 public void call(Student student) { 7 for (int i = 0; i < student.courses.length; i++) { 8 Log.d(TAG, "score = " + student.courses[i].score); 9 } 10 } 11 });
就是在接收端接收到一個Student類型的值,再使用for迴圈列印對應屬性的值。
但是如果我們不想使用for迴圈,而是希望直接再接收端就可以處理對應的成績,就要用到flatmap。代碼如下:
1 Observable.just(s1, s2).flatMap(new Func1<Student, Observable<Course>>() { 2 @Override 3 public Observable<Course> call(Student student) { 4 return Observable.from(student.courses); 5 } 6 }).subscribe(new Action1<Course>() { 7 @Override 8 public void call(Course course) { 9 Log.d(TAG, "score = " + course.score); 10 } 11 });
※ Observable的泛型改為我們要轉換的類型Course,表示我們把兩個Student對象的每個Course發送出去。在Func1中的call方法中根據傳入的Student對象的courses屬性構造一個Observable並返回,RxJava會幫我們把每個Course逐個發送出去,log如下所示:
這裡如果你覺得還是不夠,你想把每個成績變成Integer再發送,該怎麼做?
我們可以在返回Observable之前再進行一次map轉換,這個時候得到的就是Integer了,記得更改Observable對應的泛型為Integer哦,代碼如下:
1 Observable.just(s1, s2).flatMap(new Func1<Student, Observable<Integer>>() { 2 @Override 3 public Observable<Integer> call(Student student) { 4 return Observable.from(student.courses).map(new Func1<Course, Integer>() { 5 @Override 6 public Integer call(Course course) { 7 return course.score; 8 } 9 }); 10 } 11 }).subscribe(new Action1<Integer>() { 12 @Override 13 public void call(Integer score) { 14 Log.d(TAG, "score = " + score); 15 } 16 });View Code
變換操作還有其他一些,可以自行瞭解嘗試。
③ 過濾操作
在上面的計時器的例子中我們已經使用了過濾操作,take算是比較簡單的過濾了。這裡簡單介紹一下debounce,它可以讓我們過濾掉一段時間內的消息。
設定了debounce之後,在一個消息發送之後的一段時間之內發送的消息將會被過濾拋棄掉。這裡沒想到更好的例子,原諒我:
1 Observable.interval(1, TimeUnit.SECONDS).debounce(2, TimeUnit.SECONDS) 2 .subscribe(new Action1<Long>() { 3 @Override 4 public void call(Long aLong) { 5 Log.d(TAG, "call() called with: aLong = [" + aLong + "]"); 6 } 7 });
可以看到我們設置的定時,每隔1s發送一個消息,而又給它設定了debounce為2s,什麼意思呢,就是間隔小於2s的消息都被拋棄,所以這段代碼沒有任何輸出。
實際上,RxJava還有其他一些操作,這裡先忽略。說一些Android開發有關的。
我們的非同步操作一般是用來進行網路請求的,或者進行耗時操作,那麼這個時候肯定不能在主線程中進行,而RxJava的一大優點是,我們可以任意的切換線程,並且,切換起來非常方便。
為了驗證所在的線程,我們先編寫一個方法來獲取當前線程的id:
1 private long getCurrentThreadId() { 2 return Thread.currentThread().getId(); 3 }
接著我們先驗證一般情況下消息的發送和接收:
1 Observable.create(new Observable.OnSubscribe<String>() { 2 @Override 3 public void call(Subscriber<? super String> subscriber) { 4 Log.d(TAG, "Thread id of sending message is " + getCurrentThreadId()); 5 subscriber.onNext("hello"); 6 } 7 }).subscribe(new Action1<String>() { 8 @Override 9 public void call(String s) { 10 Log.d(TAG, "Thread id of receiving message is " + getCurrentThreadId()); 11 } 12 });
這段代碼很簡單,分別在發送和接收的地方列印當前的線程id,log如下:
那麼該如何切換線程呢?這裡要用到兩個方法:
- subscribeOn:指定事件發生的線程
- observeOn: 指定事件接收的線程
這兩個方法都需要傳入Scheduler對象,而這個對象可以通過Schedulers和AndroidSchedulers來獲取。
- Scheduler.newThread():始終開啟新的線程
- Scheduler.immediate():當前線程(預設)
- Scheduler.io():IO線程,如網路下載,文件讀寫
- Scheduler.computation():計算線程,如圖形計算等
- AndroidSchedulers.mainThread():Android中的UI線程
- AndroidSchedulers.from(Looper looper):根據Looper來獲取對應的線程,這裡可以根據Handler來取得Handler對應的線程
如果我們需要在新的線程中進行消息發送(網路下載),在UI線程中更新UI,那麼代碼應該如下所示:
1 Observable.create(new Observable.OnSubscribe<String>() { 2 @Override 3 public void call(Subscriber<? super String> subscriber) { 4 Log.d(TAG, "Thread id of sending message is " + getCurrentThreadId()); 5 subscriber.onNext("hello"); 6 } 7 }) 8 .subscribeOn(Schedulers.io()) 9 .observeOn(AndroidSchedulers.mainThread()) 10 .subscribe(new Action1<String>() { 11 @Override 12 public void call(String s) { 13 Log.d(TAG, "Thread id of receiving message is " + getCurrentThreadId()); 14 } 15 });
這個時候Log如下所示:
可以看到,現在兩個操作已經處於不同線程了。
如果有看我的上一篇文章,文章中需要進行高斯模糊的計算,這個計算過程可能會需要一點時間,如果不想讓界面卡頓是建議開啟新的線程進行的,而這裡正好可以用到調度器的Computation,是不是很方便呢。
參考文章: