RxJava2 使用 及 源碼閱讀

来源:https://www.cnblogs.com/xiaxveliang/archive/2020/03/02/12396110.html
-Advertisement-
Play Games

RxJava2 使用 及 源碼閱讀 RxJava是什麼?根據RxJava在GitHub上給出的描述: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event based pro ...


RxJava2 使用 及 源碼閱讀

RxJava是什麼?根據RxJava在GitHub上給出的描述:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java

大致意思是:
RxJava—一個可以在JVM上運行的,基於觀察者模式 實現非同步操作的java庫。

RxJava的作用:
就是非同步RxJava的使用,可以使“邏輯複雜的代碼”保持極強的閱讀性。

Rxjava github地址

RxAndorid的作用:
Android中RxAndorid與RxJava配合使用; RxAndorid 封裝了AndroidSchedulers.mainThread(),Android開發者使用過程中,可以輕鬆的將任務post Andorid主線程中,執行頁面更新操作。

RxAndroid github地址

使用方式

1、Observable

  • Observable:被觀察者
  • Observer:觀察者,可接收Observable發送的數據

a、Rxjava 實現線程切換:

//
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        //1、“非同步線程” 執行耗時操作
        //2、“執行完畢” 調用onNext觸發回調,通知觀察者
        e.onNext("1");
        e.onComplete();
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // 訂閱線程  訂閱的那一刻在訂閱線程中執行
            }

            @Override
            public void onNext(String value) {
                // “主線程”執行的方法
            }

            @Override
            public void onError(Throwable e) {
                // "主線程"執行的方法
            }

            @Override
            public void onComplete() {
                // "主線程"執行的方法
            }
        });

b、Rxjava 使用操作符

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        // IO 線程
        // 請求網路數據
        e.onNext("123456");
    }
}).map(new Function<String, Integer>() {
    @Override
    public Integer apply(String s) {
        // IO 線程
        // 網路數據解析(數據轉化)
        //
        // throw new RequestFailException("獲取網路請求失敗");
        return 123;
    }
}).doOnNext(new Consumer<Integer>() {    //保存登錄結果UserInfo
    @Override
    public void accept(@NonNull Integer bean) throws Exception {
        // IO 線程
        // 保存網路數據

    }
}).subscribeOn(Schedulers.io())   //IO線程
.observeOn(AndroidSchedulers.mainThread())  //主線程
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer bean) throws Exception {
        // 更新UI
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(@NonNull Throwable throwable) throws Exception {
        // 錯誤 顯示錯誤頁面
    }
});

2、Flowable

Flowable是為了應對Backpressure產生的。
Flowable是一個被觀察者,與Subscriber(觀察者)配合使用

//
Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        //1、“非同步線程” 執行耗時操作
        //2、“執行完畢” 調用onNext觸發回調,通知觀察者
        emitter.onNext(0);
        emitter.onComplete();
    }
    // 若消費者消費能力不足,則拋出MissingBackpressureException異常
}, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                // 訂閱時執行,發生在“訂閱線程”
                // 這個方法是用來向生產者申請可以消費的事件數量
                // 這裡表明消費者擁有Long.MAX_VALUE的消費能力
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                // “主線程”執行的方法
            }

            @Override
            public void onError(Throwable t) {
                // "主線程"執行的方法
            }

            @Override
            public void onComplete() {
                // "主線程"執行的方法
            }
        });

a、 Backpressure(背壓)

Backpressure(背壓)生產者的生產速度大於消費者的消費能力引起的問題。

在RxJava中有一種情況就是被觀察者發送消息十分迅速以至於觀察者不能及時的響應這些消息

例如:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        // “非同步線程”中 生產者有無限的生產能力
        while (true){
            e.onNext(1);
        }
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        // “主線程”中 消費者消費能力不足,從而造成事件無限堆積,最後導致OOM
        Thread.sleep(2000);
        System.out.println(integer);
    }
});

非同步線程中 生產者有無限的生產能力;
主線程中 消費者消費能力不足,從而造成事件無限堆積,最後導致OOM。

上述的現象,有個專有的名詞來來形容,即:Backpressure(背壓)

b、Subscription.request(long n);

Subscription.request(long n) 方法是用來向生產者申請可以消費的事件數量

  • 當調用了request(long n)方法後,生產者便發送對應數量的事件供消費者消費;
  • 如果不顯示調用request就表示消費能力為0

在非同步調用時,RxJava中有個緩存池,用來緩存消費者處理不了暫時緩存下來的數據,緩存池的預設大小為128,即只能緩存128個事件。
無論request()中傳入的數字比128大或小,緩存池中在剛開始都會存入128個事件;當然如果本身並沒有這麼多事件需要發送,則不會存128個事件。

  • BackpressureStrategy.ERROR策略下,如果生產者生產的事件大於128個,緩存池便會溢出,從而拋出MissingBackpressureException異常;
  • BackpressureStrategy.BUFFER策略:將RxJava中預設的128個事件的緩存池換成一個更大的緩存池,這樣,消費者通過request()即使傳入一個很大的數字,生產者也會生產事件。但是這種方式比較消耗記憶體,除非是我們比較瞭解消費者的消費能力,能夠把握具體情況,不會產生OOM。總之BUFFER要慎用。
  • BackpressureStrategy.DROP策略:當消費者處理不了事件,則丟棄。消費者通過request()傳入其需求n,然後生產者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉。
  • BackpressureStrategy.LATEST策略: LATEST與DROP功能基本一致。消費者通過request()傳入其需求n,然後生產者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉。唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最後一個事件。

源碼閱讀——簡單例子 (一)

註:當前使用的源碼版本 rxjava:2.1.9

從這段不涉及操作符和線程切換的簡單例子開始:

// 創建觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String o) {

    }

    @Override
    public void onError(@NonNull Throwable e) {
        Log.d(TAG, "onError data is :" + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete");
    }
};

// 創建被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 訂閱
observable.subscribe(observer);

a、ObservableOnSubscribe.java

先看一下ObservableOnSubscribe.java這個類

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

由代碼可知 ObservableOnSubscribe是一個回調介面,回調方法中參數為ObservableEmitter,下邊看一下ObservableEmitter 這個類。

ObservableEmitter.java

ObservableEmitter字面意思是被觀察者發射器,看一下源碼:

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    @NonNull
    ObservableEmitter<T> serialize();

    @Experimental
    boolean tryOnError(@NonNull Throwable t);
}

ObservableEmitter是對Emitter的擴展,而擴展的方法正是 RxJava2.0 之後引入的。提供了可中途取消等新能力,我們看 Emitter 源碼:

public interface Emitter<T> {

    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}

Emitter字面意思是發射器,這裡邊的三個方法,大家都很熟悉了。其對應了以下這段代碼:

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
}

回調說完,下邊我們來看Observable.create(ObservableOnSubscribe<T> source) 這段代碼。

b、Observable.create(ObservableOnSubscribe source)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  • RxJavaPlugins 先忽略
  • 我們看到傳入的ObservableOnSubscribe被用來創建ObservableCreate,其實ObservableCreate就是Observable的一個實現類

因此 Observable.create(ObservableOnSubscribe<T> source) 這段代碼,實際是:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO線程中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
  • 這裡我們知道:當 ObservableOnSubscribe.subscribe 方法被執行時,用戶通過調用ObservableEmitter.onNext方法,將數據發送出去(發送給觀察者)

下邊我們看一下ObservableCreate 這個類

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}
  • ObservableOnSubscribe.subscribe 方法是在ObservableCreate.subscribeActual 方法中第四行中被執行了;subscribe方法中,用戶通過調用ObservableEmitter.onNext方法,將數據發送出去;
  • subscribeActual方法第二行,調用了observer.onSubscribe(parent);方法。 訂閱發生時,在訂閱線程主動執行了observeronSubscribe方法;
  • CreateEmitter 是對ObservableCreate.subscribeActual(Observer<? super T> observer)方法傳入的Observer的封裝;
  • CreateEmitter的作用是任務取消時,可以不再回調其封裝的觀察者;observeronNext方法,由CreateEmitter.onNext方法調用;

Observable.create(ObservableOnSubscribe<T> source); 方法最終返回一個 ObservableCreate 對象。
下邊看 observable.subscribe(observer); 方法

c、observable.subscribe(observer);

  • observable.subscribe(observer); 即 訂閱發生的那一刻。
  • 這裡 observable.subscribe(observer); 實際是ObservableCreate.subscribe(observer);

下邊查看Observablesubscribe(observer)方法

Observable.subscribe(Observer observer)

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // Observable的subscribe方法,實際執行的是subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        //
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
  • 調用 observable.subscribe(observer); 方法時,實際是調用了observable.subscribeActual(observer) 方法。
  • observableObservableCreate的引用,因此這裡調用的是ObservableCreate.subscribeActual(observer) 方法。

我們又回到 ObservableCreate 這個類的subscribeActual方法

ObservableCreate.java

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //  subscribeActual 方法在 訂閱發生的那一刻被調用 既 observable.subscribe(observer);時被調用
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 若中途任務取消,通過CreateEmitter 可終止對observer中方法onNext 、onError 等的回調
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}
  • subscribeActual 方法在 訂閱發生的那一刻被調用的;在 observable.subscribe(observer); 時被調用;
  • observer.onSubscribe(parent); 訂閱發生時,在訂閱線程回調observeronSubscribe方法
  • subscribeActual 方法中,傳入的Observer會被包裝成一個CreateEmitter;若中途任務取消,通過CreateEmitter 可終止對observer中方法onNext 、onError 等的回調;

subscribeActual 中第二行代碼 observer.onSubscribe(parent);

observer.onSubscribe(parent); 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法,這裡回到了以下代碼

// 創建觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
    }
    // ... 省略onNext、onError、onComplete
};
  • 這裡傳入的參數為 new CreateEmitter<T>(observer) ,其實現了Disposable介面,若任務取消,則不回調傳入的觀察者observer 對應的onNext 、onError、onComplete 等方法

subscribeActual 中第四行代碼 source.subscribe(parent);

source.subscribe(parent);ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));

代碼最終回到ObservableOnSubscribesubscribe :

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
}
  • subscribe中,調用到 CreateEmitter 類的onNext 、onComplete、onError 方法,將數據發送CreateEmitter中的觀察者

到此,“這段不涉及操作符和線程切換的簡單例子” 的代碼跟蹤結束。

源碼閱讀——線程切換 (二)

註:當前使用的源碼版本 rxjava:2.1.9

從這段線程切換的簡單例子開始:

// 創建觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 訂閱線程  訂閱的那一刻在訂閱線程中執行
    }

    @Override
    public void onNext(String o) {
        // Android 主線程中執行
    }

    @Override
    public void onError(@NonNull Throwable e) {
        // Android 主線程中執行
    }

    @Override
    public void onComplete() {
        // Android 主線程中執行
    }
};

// 創建被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO線程中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 被觀察者 IO 線程
observable = observable.subscribeOn(Schedulers.io());
// 觀察者  Android主線程
observable = observable.observeOn(AndroidSchedulers.mainThread());
// 訂閱
observable.subscribe(observer);

先來個我總結的RxJava2的整個代碼執行流程:
這裡寫圖片描述

a、Observable.create(ObservableOnSubscribe source)

源碼閱讀——簡單例子 (一) 中我們瞭解到了Observable.create(ObservableOnSubscribe<T> source)實際是 如下代碼:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO線程中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
  • ObservableCreate 中含有一個subscribeActual(observer) 方法,用於執行傳入觀察者的observer.onSubscribe方法,和間接調用 觀察者的onNext、onComplete 等方法;

ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}
  • subscribeActual方法第二行,調用了傳入的觀察者的observer.onSubscribe(parent);方法; 訂閱發生時,在訂閱線程主動執行了observeronSubscribe方法;
  • subscribeActual方法第四行,調用了傳入的觀察者的observer.subscribe 方法;subscribe方法中,用戶通過調用CreateEmitter.onNext方法,將數據發送出去;
  • CreateEmitter 是對ObservableCreate.subscribeActual(Observer<? super T> observer)方法傳入的Observer的封裝;
  • CreateEmitter的作用是任務取消時,可以不再回調其封裝的觀察者;observeronNext方法,由CreateEmitter.onNext方法調用;

下邊查看observable.subscribeOn(Schedulers.io())相關代碼

註:
ObservableEmitterCreateEmitter的引用,是對Observer的進一步封裝。CreateEmitter在執行onNext時,如果任務取消,則不再回調ObserveronNext方法。

b、observable.subscribeOn(Schedulers.io())

下邊我們查看Observable 類的subscribeOn(Scheduler scheduler)方法

Observable.java

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // 生成一個ObservableSubscribeOn對象
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
  • 繼續忽略RxJavaPlugins
  • 最終返回一個ObservableSubscribeOn 對象

這裡Observable observable = observableCreate.subscribeOn(Schedulers.io())代碼實際是

ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
  • 因此 observable.subscribeOn(Schedulers.io()) 返回的是一個ObservableSubscribeOn 的引用

下邊查看ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    // ... 省略部分代碼
}

看一下ObservableSubscribeOn中的subscribeActual 方法

  • subscribeActual 方法第二行代碼中,執行了傳入ObserveronSubscribe 方法;
  • subscribeActual 方法第三行: 在 scheduler 對應的IO線程中,執行observableCreatesubscribe 方法,傳入參數為SubscribeOnObserver,即:IO線程中 執行observableCreate.subscribe(new SubscribeOnObserver(observer));

因此,無論ObservableSubscribeOn.subscribeActual(observer)在哪個線程中被調用observableCreate.subscribe(new SubscribeOnObserver<T>(observer))均在IO線程中執行,因此觀察者的e.onNext("hello"); e.onComplete(); 亦在IO線程中執行;

c、observable.observeOn(AndroidSchedulers.mainThread())

下邊我們查看Observable 類的observeOn(Scheduler scheduler)方法

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
// 
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

這裡可以看到 Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())實際是:

ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);

因此 ,observable.observeOn(AndroidSchedulers.mainThread()) 返回的是ObservableObserveOn 的引用。

下邊查看ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代碼
}

看一下ObservableObserveOn中的subscribeActual 方法

  • subscribeActual 方法第五行代碼,實際為observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
  • ObserveOnObserver 的作用是在ObserveOnObserveronNext方法被實行時;將observeronNext方法post到 Android主線程中;

d、observable.subscribe(observer)

  • 我們知道Observablesubscribe(Observer<? super T> observer)方法,實際調用到了ObservablesubscribeActual(Observer<? super T> observer) 方法;
  • 而這裡的observable 實際是ObservableObserveOn的引用;

因此,observable.subscribe(observer)實際執行的是observableObserveOn.subscribeActual(observer)

到這裡,我們 線程切換 (二) 的小例子變換為了以下代碼:

// 創建觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 訂閱線程  訂閱的那一刻在訂閱線程中執行
    }

    @Override
    public void onNext(String o) {
        // Android 主線程中執行
    }

    @Override
    public void onError(@NonNull Throwable e) {
        // Android 主線程中執行
    }

    @Override
    public void onComplete() {
        // Android 主線程中執行
    }
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO線程中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
//
observableObserveOn.subscribeActual(observer);

下邊我們查看observableObserveOn.subscribeActual(observer)

ObservableObserveOn.java

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        // source 為 observableSubscribeOn
        super(source);
        // scheduler 為AndroidSchedulers.mainThread()
        this.scheduler = scheduler;
        // false
        this.delayError = delayError;
        // 128
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // AndroidSchedulers.mainThread() 為 HandlerScheduler,因此會走到else部分代碼
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        }
        // 代碼會走到else 部分
         else {
            Scheduler.Worker w = scheduler.createWorker();
            // source 為 observableSubscribeOn
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代碼
}
  • subscribeActual 方法中,AndroidSchedulers.mainThread()HandlerScheduler ,因此 if 中的判斷語句直接忽略,直接走到代碼的 else 部分。
  • subscribeActual 方法中,將觀察者observer封裝成了ObserveOnObserver;並且調用observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
  • observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))實際是
ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1、“訂閱線程中” —— 執行onSubscribe, 實際執行的是observer的onSubscribe方法
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2、“IO程中” —— 執行subscribe ;IO線程 subscribe方法中,用戶主動調用ObserveOnObserver的onNext、onError、onComplete方法,將數據發出去
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))
  • 用戶調用SubscribeOnObserveronNext 是將數據發送出去
  • SubscribeOnObserver.onNext調用了observeOnObserver.onNext
  • observeOnObserver.onNext通過HandlerSchedulerobserver.onNext、observer.onError、observer.onComplete 等方法post到Android主線程中執行。

e、整體流程圖如下

最後總結一下RxJava2的整個執行流程:

這裡寫圖片描述

參考

手把手教你使用 RxJava 2.0(一)
RxJava2 源碼解析(一)
RxJava2 源碼解析——流程

========== THE END ==========

wx_gzh.jpg


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 查詢中出現兩個表的連接,下麵通過實例來講解一下各種連接查詢的不同之處 表 a,和表b 如下圖 a 表中 有 abcd b表中有 abcf 內連接: 得出結果 如圖,選擇等值的結果(abc) 左連接: 查詢結果如圖,選擇a表為基準。(abcd) 右連接: 查詢結果如圖,選擇a表為基準。(abcf) 全 ...
  • 第一步 下載安裝包: 官網 畢竟是甲骨文公司的產品,去官網下真的慢! 這裡有兩個供選擇的,我建議選第一個(因為我先下了第二個,結果失敗了,不知道為什麼總是出錯。) 下載完自行選擇路徑解壓就可以了。 第二步 配置my.ini: 在根目錄下新建文本文件,將下麵的複製上,註意修改尾碼為.ini [mysq ...
  • 疫情已經持續了好幾個月了,作為程式員滴我們也幫不上什麼忙,只有老老實實呆在家裡或者出門一定戴口罩準守一些規則,不給國家添亂。不過最近疫情開始有所扭轉,但是還是對國家經濟,對企業業務造成了很大的影響,我也被停止了實習。接下來,可能會面臨著失業,破產等等嚴肅的問題。但是我們還是需要繼續學習,提高自己的競 ...
  • Oracle體繫結構 實例: 一個操作系統只有一個 Oracle 資料庫 一個 Oracle 資料庫可以有多個 Oracle 實例(通常只安裝一個實例) 一個實例對應著一系列的後臺進程和記憶體結構 表空間: 一個實例在邏輯上可以分成若幹個表空間 表空間是 Oracle 對數據文件的邏輯映射 表空間不屬 ...
  • Socket通信有兩種主要方式:TCP協議和UDP協議,兩者區別是TCP協議要首先和接收方要建立連接然後發送數據,這樣數據能保證送達,但速度較慢;UDP協議首先把數據打包,然後直接發送到接收方,無需建立連接誒,速度快,但容易丟失數據。這裡是一個簡單的基於TCP協議的通信實例: 直接上代碼: 首先是j ...
  • 去年計劃完成移動互聯網技術開發三部曲:微信小程式開發、iOS App開發和Android App開發的。故系列文章命名為:一個人開發一個App……開頭。 ...
  • 一、摘要 1.七牛上傳文件,用hash來唯一標識七牛存儲空間中的某個文件,該hash是以ETag演算法計算出的一段哈希值; 2.演算法介紹:https://developer.qiniu.com/kodo/manual/1231/appendix; 3.七牛的提供的實現語言中(https://githu ...
  • Android JsBridge源碼學習 眾所周知Android 4.2以下的WebView存在addJavascriptInterface漏洞的問題,不太瞭解的同學可參考 "Android4.2下 WebView的addJavascriptInterface漏洞解決方案" "@Javascript ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...