RxJava2 入門詳細筆記

来源:https://www.cnblogs.com/leavesC/archive/2018/10/07/9749155.html
-Advertisement-
Play Games

一、概述 在 RxJava 中,一個實現了 介面的對象可以訂閱一個 類的實例。訂閱者對 發射的任何數據或數據序列作出響應。這種模式簡化了併發操作,因為它不需要阻塞等待 發射數據,而是創建了一個處於待命狀態的觀察者哨兵,哨兵在未來某個時刻響應 的通知。RxJava 提供了一套非同步編程的 API,並且支 ...


一、概述

在 RxJava 中,一個實現了 Observer 介面的對象可以訂閱一個 Observable 類的實例。訂閱者對 Observable 發射的任何數據或數據序列作出響應。這種模式簡化了併發操作,因為它不需要阻塞等待 Observable 發射數據,而是創建了一個處於待命狀態的觀察者哨兵,哨兵在未來某個時刻響應 Observable 的通知。RxJava 提供了一套非同步編程的 API,並且支持鏈式調用,所以使用 RxJava 編寫的代碼的邏輯會非常簡潔

RxJava 有以下三個最基本的元素:

  1. 被觀察者(Observable)
  2. 觀察者(Observer)
  3. 訂閱(subscribe)

創建被觀察者

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) {
                Log.e(TAG, "subscribe");
                Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });

創建觀察者

        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        };

完成觀察者與被觀察者之間的訂閱關係

     observable.subscribe(observer);

也可以以鏈式調用的方式來完成訂閱

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) {
                Log.e(TAG, "subscribe");
                Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });

最終的輸出結果是一樣的

    onSubscribe
    subscribe
    currentThread name: main
    onNext: 1
    onNext: 2
    onNext: 3
    onComplete

被觀察者發送的事件類型有以下幾種

事件種類 作用
onNext() 發送該事件時,觀察者會回調 onNext() 方法
onError() 發送該事件時,觀察者會回調 onError() 方法,當發送該事件之後,其他事件將不會繼續發送
onComplete() 發送該事件時,觀察者會回調 onComplete() 方法,當發送該事件之後,其他事件將不會繼續發送

下麵來講解 RxJava 中各種常見的操作符

二、創建操作符

2.1、create()

用於創建一個 Observable。一個正確的 Observable 必須嘗試調用觀察者的 onCompleted 方法或者 onError 方法有且僅有一次,而且此後不能再調用Observable 的任何其它方法

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) {
                Log.e(TAG, "subscribe");
                Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });

2.2、just()

創建一個 Observable併發送事件,發送的事件總數不可以超出十個

        Observable.just(1, 2, 3).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
    onSubscribe
    onNext: 1
    onNext: 2
    onNext: 3
    onComplete

2.3、fromArray

just() 類似,但 fromArray 可以傳入多於十個的變數,並且可以傳入一個數組

        Integer[] arrays = new Integer[]{1, 2, 3};
        Observable.fromArray(arrays).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });

2.4、fromCallable

這裡的 Callable 是指 java.util.concurrent 中的 CallableCallableRunnable 的用法基本一致,只是它包含一個返回值,這個結果值就是發給觀察者的

        Observable.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() {
                return 100;
            }
        });

2.5、fromFuture

這裡的 Future 是指 java.util.concurrent 中的 FutureFuture 的作用是增加了 cancel() 等方法操作 Callable,它可以通過 get() 方法來獲取 Callable 返回的值

        final FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
            @Override
            public Integer call() {
                return 12;
            }
        });
        Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) {
                futureTask.run();
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                Log.e(TAG, "accept: " + integer);
            }
        });

2.6、fromIterable()

用於發送一個 List 集合數據給觀察者

        List<Integer> integerList = new ArrayList<>();
        integerList.add(1);
        integerList.add(2);
        integerList.add(3);
        Observable.fromIterable(integerList).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                Log.e(TAG, "accept: " + integer);
            }
        });

2.7、defer()

defer 操作符會一直等待直到有觀察者訂閱它,然後它使用 Observable 工廠方法生成一個 Observable。它對每個觀察者都這樣做,因此儘管每個訂閱者都以為自己訂閱的是同一個 Observable ,實際上每個訂閱者獲取到的都是它們自己的單獨的數據序列。在某些情況下,直到發生訂閱時才生成 Observable 可以確保 Observable 包含最新的數據

    //全局變數
    private Integer value = 100;

    Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() {
                return Observable.just(value);
            }
        });
        value = 200;
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                Log.e(TAG, "accept: " + integer);
            }
        });
        value = 300;
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                Log.e(TAG, "accept: " + integer);
            }
        });
    accept: 200
    accept: 300

defer() 操作符能使得每次訂閱操作都創建被觀察者,因此兩次訂閱操作會創建不同的被觀察者對象,因此兩次列印操作返回的值並不一樣

2.8、timer()

延遲指定時間後會發送一個大小為 0L 的值給觀察者

       Observable.timer(2, TimeUnit.SECONDS)
           .subscribe(new Consumer<Long>() {
               @Override
               public void accept(Long aLong) {

               }
           });

2.9、interval()

每隔一段時間就發送一個事件,傳遞的值從 0 開始並不斷增 1

    Observable.interval(2, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) {
                Log.e(TAG, "value is: " + aLong);
            }
        });

2.10、intervalRange()

可以指定發送事件的開始值和數量,其他與 interval() 的功能一樣

            Observable.intervalRange(2, 3, 4, 5, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "onNext:" + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "onComplete");
                    }
                });

起始值從 2 開始遞增,事件共傳遞三次,第一次事件在訂閱後延遲 4 秒觸發,之後每次延遲 5 秒

10-06 10:48:40.017 17976-17976/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 10:48:44.017 17976-17990/leavesc.hello.rxjavademo E/MainActivity: onNext:2
10-06 10:48:49.017 17976-17990/leavesc.hello.rxjavademo E/MainActivity: onNext:3
10-06 10:48:54.017 17976-17990/leavesc.hello.rxjavademo E/MainActivity: onNext:4
10-06 10:48:54.017 17976-17990/leavesc.hello.rxjavademo E/MainActivity: onComplete

2.11、range()

發送指定範圍的事件序列

            Observable.range(2, 5)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) {
                        Log.e(TAG, "values is :" + integer);
                    }
                });
    values is :2
    values is :3
    values is :4
    values is :5
    values is :6

2.12、rangeLong()

作用與 range() 一樣,只是數據類型是 Long

             Observable.rangeLong((2, 5)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) {
                        Log.e(TAG, "values is :" + aLong);
                    }
                });

2.13、empty() & never() & error()

empty() 直接發送 onComplete() 事件

        Observable.empty().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Object object) {
                Log.e(TAG, "onNext: " + object);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });

列印結果

    onSubscribe
    onComplete

換成 never()

onSubscribe

換成 error()

Observable.error(new Throwable("Hello")).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Object object) {
                Log.e(TAG, "onNext: " + object);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
    onSubscribe
    onError: Hello

三、轉換操作符

3.1、map()

map() 用於將被觀察者發送的數據類型轉變成其他的類型

    Observable.just(1, 2, 3)
        .map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) {
                return "I'm " + integer;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                Log.e(TAG, s);
            }
        });
10-06 10:53:16.364 18099-18099/leavesc.hello.rxjavademo E/MainActivity: I'm 1
10-06 10:53:16.364 18099-18099/leavesc.hello.rxjavademo E/MainActivity: I'm 2
10-06 10:53:16.364 18099-18099/leavesc.hello.rxjavademo E/MainActivity: I'm 3

3.2、flatMap()

用於將事件序列中的元素進行整合加工,返回一個新的被觀察者

        List<List<String>> listArrayList = new ArrayList<>();

        List<String> stringList = new ArrayList<>();
        for (int j = 0; j < 2; j++) {
            stringList.add("A_" + j);
        }
        listArrayList.add(stringList);

        stringList = new ArrayList<>();
        for (int j = 0; j < 2; j++) {
            stringList.add("B_" + j);
        }
        listArrayList.add(stringList);

        Observable.fromIterable(listArrayList).flatMap(new Function<List<String>, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(List<String> stringList1) throws Exception {
                return Observable.fromIterable(stringList1);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "value is: " + s);
            }
        });
10-06 11:02:47.246 18230-18230/leavesc.hello.rxjavademo E/MainActivity: value is: A_0
10-06 11:02:47.246 18230-18230/leavesc.hello.rxjavademo E/MainActivity: value is: A_1
10-06 11:02:47.246 18230-18230/leavesc.hello.rxjavademo E/MainActivity: value is: B_0
10-06 11:02:47.246 18230-18230/leavesc.hello.rxjavademo E/MainActivity: value is: B_1

3.3、concatMap()

concatMap()flatMap() 基本一樣,只不過 concatMap() 轉發出來的事件是有序的,而 flatMap() 是無序的

還是用 flatMap()的例子來看

Observable.fromIterable(listArrayList).flatMap(new Function<List<String>, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(List<String> stringList1) throws Exception {
                if (stringList1.get(0).startsWith("A")) {
                    return Observable.fromIterable(stringList1).delay(200, TimeUnit.MILLISECONDS);
                }
                return Observable.fromIterable(stringList1);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "value is: " + s);
            }
        });

進行了一次延時操作,可以看到兩次事件的發送順序顛倒了

10-06 11:07:30.753 18702-18702/leavesc.hello.rxjavademo E/MainActivity: value is: B_0
10-06 11:07:30.753 18702-18702/leavesc.hello.rxjavademo E/MainActivity: value is: B_1
10-06 11:07:30.953 18702-18716/leavesc.hello.rxjavademo E/MainActivity: value is: A_0
10-06 11:07:30.953 18702-18716/leavesc.hello.rxjavademo E/MainActivity: value is: A_1

使用 concatMap() 則順序將保持一致

3.4、buffer()

從需要發送的事件當中獲取指定數量的事件,並將這些事件放到緩衝區當中一併發出。buffer 有兩個參數,參數一count用於指點緩衝區大小,參數二 skip用指定當緩衝區滿了時,發送下一次事件序列的時候要跳過多少元素

            Observable.just(1, 2, 3, 4, 5, 6)
                .buffer(2, 2)
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(List<Integer> integers) {
                        Log.e(TAG, "緩衝區大小: " + integers.size());
                        for (Integer i : integers) {
                            Log.e(TAG, "元素: " + i);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "onComplete");
                    }
                });
 onSubscribe
 緩衝區大小: 2
 元素: 1
 元素: 2
 緩衝區大小: 2
 元素: 3
 元素: 4
 緩衝區大小: 2
 元素: 5
 元素: 6
 onComplete

3.5、groupBy()

用於將數據進行分組,每個分組都會返回一個被觀察者。groupBy() 方法的返回值用於指定分組名,每返回一個新值就代表會創建一個分組

            Observable.just(1, 2, 3, 4, 5, 6, 7)
                .groupBy(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) {
                        if (integer < 4) {
                            return "hello";
                        }
                        return "hi";
                    }
                })
                .subscribe(new Observer<GroupedObservable<String, Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(final GroupedObservable<String, Integer> observable) {
                        observable.subscribe(new Observer<Integer>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.e(TAG, "GroupedObservable onSubscribe");
                            }

                            @Override
                            public void onNext(Integer integer) {
                                Log.e(TAG, "GroupedObservable onNext key :" + observable.getKey());
                                Log.e(TAG, "GroupedObservable onNext value :" + integer);
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.e(TAG, "GroupedObservable onError");
                            }

                            @Override
                            public void onComplete() {
                                Log.e(TAG, "GroupedObservable onComplete");
                            }
                        });
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "onComplete");
                    }
                });
10-06 11:16:35.616 19015-19015/? E/MainActivity: onSubscribe
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onSubscribe
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hello
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :1
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hello
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :2
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hello
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :3
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onSubscribe
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hi
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :4
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hi
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :5
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hi
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :6
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext key :hi
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onNext value :7
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onComplete
10-06 11:16:35.616 19015-19015/? E/MainActivity: GroupedObservable onComplete
10-06 11:16:35.616 19015-19015/? E/MainActivity: onComplete

3.6、scan()

scan() 操作符對原始 Observable 發射的第一條數據應用一個函數,然後將那個函數的結果作為自己的第一項數據發射。它將函數的結果同第二項數據一起填充給這個函數來產生它自己的第二項數據。它持續進行這個過程來產生剩餘的數據序列

        Observable.just(1, 5, 8, 12).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) {
                Log.e(TAG, "integer : " + integer);
                Log.e(TAG, "integer2 : " + integer2);
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 11:25:19.389 19158-19158/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 11:25:19.389 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer : 1
10-06 11:25:19.389 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer2 : 5
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: accept : 6
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer : 6
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer2 : 8
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: accept : 14
10-06 11:25:19.399 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer : 14
10-06 11:25:19.409 19158-19158/leavesc.hello.rxjavademo E/MainActivity: integer2 : 12
10-06 11:25:19.409 19158-19158/leavesc.hello.rxjavademo E/MainActivity: accept : 26

四、組合操作符

4.1、concat() & concatArray()

用於將多個觀察者組合在一起,然後按照參數的傳入順序發送事件,concat() 最多只可以發送4個事件

        Observable.concat(Observable.just(1, 2),
                Observable.just(3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8)).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: " + integer);
            }
        });
accept: 1
accept: 2
accept: 3
accept: 4
accept: 5
accept: 6
accept: 7
accept: 8

concatArray() 作用與 concat() 作用一樣,不過前者可以發送多於 4 個的被觀察者

4.2、merge() & mergeArray()

這個方法與 concat() 作用基本一樣,只是 concat() 是串列發送事件,而 merge() 並行發送事件

    Observable.merge(Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
                    @Override
                    public String apply(Long aLong) {
                        return "Test_A_" + aLong;
                    }
                }),
                Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
                    @Override
                    public String apply(Long aLong) {
                        return "Test_B_" + aLong;
                    }
                })).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                Log.e(TAG, "accept: " + s);
            }
        });
Test_A_0
Test_B_0
Test_A_1
Test_B_1
Test_A_2
Test_B_2
Test_B_3
Test_A_3
Test_A_4
Test_B_4
Test_A_5
Test_B_5

mergeArray() 可以發送 4 個以上的被觀察者

4.3、concatArrayDelayError() & mergeArrayDelayError()

concatArray()mergeArray() 兩個方法當中,如果其中有一個被觀察者發送了一個 Error 事件,那麼就會停止發送事件,如果想 onError() 事件延遲到所有被觀察者都發送完事件後再執行的話,可以使用 concatArrayDelayError()mergeArrayDelayError()

首先使用 concatArray() 來驗證其發送 onError() 事件是否會中斷其他被觀察者的發送事件

Observable.concatArray(Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Exception("Normal Exception"));
            }
        }), Observable.just(30, 40, 50)).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });
onNext: 1
onNext: 2
onError: Normal Exception

從結果可以知道,確實中斷了,現在換用 concatArrayDelayError()

10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 1
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 2
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 30
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 40
10-06 04:00:04.935 6514-6514/? E/MainActivity: onNext: 50
10-06 04:00:04.935 6514-6514/? E/MainActivity: onError: Normal Exception

從結果可以看到,onError 事件是在所有被觀察者發送完事件才發送的

4.4、zip()

zip() 操作符返回一個 Obversable,它使用這個函數按順序結合兩個或多個 Observables 發射的數據項,然後它發射這個函數返回的結果。它按照嚴格的順序應用這個函數。它只發射與發射數據項最少的那個 Observable 一樣多的數據

        Observable.zip(Observable.just(1, 2, 3, 4), Observable.just(5, 6, 7, 8, 9),
                new BiFunction<Integer, Integer, String>() {
                    @Override
                    public String apply(Integer integer, Integer integer2) throws Exception {
                        return String.valueOf(integer) + "_" + String.valueOf(integer2);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "accept: " + s);
                    }
                });
accept: 1_5
accept: 2_6
accept: 3_7
accept: 4_8

4.5、combineLatest() & combineLatestDelayError()

combineLatest() 的作用與 zip() 類似,combineLatest() 發送事件的序列是與發送的時間線有關的,當兩個 Observables 中的任何一個發射了一個數據時,通過一個指定的函數組合每個 Observable 發射的最新數據,然後發射這個函數的結果

Observable.zip(
               Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
                        .map(new Function<Long, String>() {
                            @Override
                            public String apply(Long aLong) {
                                String s1 = "A" + aLong;
                                Log.e(TAG, "A 發送的事件 " + s1);
                                return s1;
                            }
                        }), Observable.intervalRange(1, 4, 2, 1, TimeUnit.SECONDS)
                        .map(new Function<Long, String>() {
                            @Override
                            public String apply(Long aLong) {
                                String s1 = "B" + aLong;
                                Log.e(TAG, "B 發送的事件 " + s1);
                                return s1;
                            }
                        }),
                new BiFunction<String, String, String>() {
                    @Override
                    public String apply(String value1, String value2) throws Exception {
                        return value1 + "_" + value2;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "accept: " + s);
                    }
                });
10-06 05:17:06.337 7227-7241/leavesc.hello.rxjavademo E/MainActivity: A 發送的事件 A1
10-06 05:17:07.337 7227-7241/leavesc.hello.rxjavademo E/MainActivity: A 發送的事件 A2
10-06 05:17:07.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: B 發送的事件 B1
10-06 05:17:07.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: accept: A1_B1
10-06 05:17:08.337 7227-7241/leavesc.hello.rxjavademo E/MainActivity: A 發送的事件 A3
10-06 05:17:08.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: B 發送的事件 B2
10-06 05:17:08.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: accept: A2_B2
10-06 05:17:09.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: B 發送的事件 B3
10-06 05:17:09.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: accept: A3_B3
10-06 05:17:09.337 7227-7241/leavesc.hello.rxjavademo E/MainActivity: A 發送的事件 A4
10-06 05:17:10.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: B 發送的事件 B4
10-06 05:17:10.337 7227-7242/leavesc.hello.rxjavademo E/MainActivity: accept: A4_B4

當發送 A1 和 A2 事件時,B 並沒有發送任何事件,所以不會觸發到 accept 方法。當發送了 B1 事件之後,就會與 A 最新發送的事件 A2 結合成 A1_B2,之後的發射規則也以此類推

combineLatestDelayError() 多了延遲發送 onError() 的功能

4.6、reduce()

scan() 操作符的作用類似,也是將發送數據以一定邏輯聚合起來,區別在於 scan() 每處理一次數據就會將事件發送給觀察者,而 reduce() 會將所有數據聚合在一起才會發送事件給觀察者

Observable.just(1, 3, 5, 7).reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                Log.e(TAG, "integer1 : " + integer);
                Log.e(TAG, "integer2 : " + integer2);
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
integer1 : 1
integer2 : 3
integer1 : 4
integer2 : 5
integer1 : 9
integer2 : 7
accept : 16

4.7、collect()

collect()reduce() 類似,但它的目的是收集原始 Observable 發射的所有數據到一個可變的數據結構

Observable.just(1, 2, 3, 4)
                .collect(new Callable<ArrayList<Integer>>() {
                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<ArrayList<Integer>, Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
                        integers.add(integer);
                    }
                })
                .subscribe(new Consumer<ArrayList<Integer>>() {
                    @Override
                    public void accept(ArrayList<Integer> integers) throws Exception {
                        Log.e(TAG, "accept : " + integers);
                    }
                });
accept : [1, 2, 3, 4]

4.8、startWith() & startWithArray()

在發送事件之前追加事件,startWith() 追加一個事件,startWithArray() 可以追加多個事件,追加的事件會先發出

        Observable.just(4, 5)
                .startWithArray(2, 3)
                .startWith(1)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "accept : " + integer);
                    }
                });
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 4
10-06 05:38:21.081 8033-8033/leavesc.hello.rxjavademo E/MainActivity: accept : 5

4.9、count()

返回被觀察者發送事件的數量

        Observable.just(1, 2, 3)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "aLong : " + aLong);
                    }
                });
aLong : 3

五、功能操作符

5.1、delay()

延遲一段事件再發送事件

        Observable.just(1, 2, 3)
                .delay(3, TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer value) throws Exception {
                        Log.e(TAG, "value : " + value);
                    }
                });

5.2、doOnEach()

Observable 發送一次事件之前都會回調這個方法

Observable.just(1, 2, 3)
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.e(TAG, "integerNotification value : " + integerNotification.getValue());
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer value) throws Exception {
                        Log.e(TAG, "accept : " + value);
                    }
                });
10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 1
10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 1
10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 2
10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 2
10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : 3
10-06 05:53:28.510 8645-8645/? E/MainActivity: accept : 3
10-06 05:53:28.510 8645-8645/? E/MainActivity: integerNotification value : null

5.3、doOnNext()

Observable 發送 onNext() 之前都會先回調這個方法

Observable.just(1, 2, 3)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "doOnNext accept : " + integer);
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer value) throws Exception {
                        Log.e(TAG, "accept : " + value);
                    }
                });
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 1
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 2
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 3
10-06 05:55:25.618 8758-8758/leavesc.hello.rxjavademo E/MainActivity: accept : 3

5.4、doAfterNext()

Observable 發送 onNext() 之後都會回調這個方法

Observable.just(1, 2, 3)
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "doOnNext accept : " + integer);
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer value) throws Exception {
                        Log.e(TAG, "accept : " + value);
                    }
                });
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 1
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 2
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 05:57:09.357 8872-8872/leavesc.hello.rxjavademo E/MainActivity: doOnNext accept : 3

5.5、doOnComplete()

Observable 調用 onComplete() 之前都會回調這個方法

Observable.just(1, 2, 3)
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doOnComplete run()");
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer value) throws Exception {
                        Log.e(TAG, "accept : " + value);
                    }
                });
10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 06:08:43.688 8982-8982/leavesc.hello.rxjavademo E/MainActivity: doOnComplete run()

5.6、doOnError()

Observable 發送 onError() 之前都會回調這個方法

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Exception("Normal Exception"));
            }
        }).doOnError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "doOnError accept() : " + throwable.getMessage());
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });
10-06 06:14:17.894 9230-9230/? E/MainActivity: onNext : 1
10-06 06:14:17.894 9230-9230/? E/MainActivity: onNext : 2
10-06 06:14:17.894 9230-9230/? E/MainActivity: doOnError accept() : Normal Exception
10-06 06:14:17.894 9230-9230/? E/MainActivity: onError : Normal Exception

5.7、doOnSubscribe()

Observable 發送 onSubscribe() 之前會回調這個方法

5.8、doOnDispose()

當調用 Disposabledispose() 之後會回調該方法

5.9、doOnLifecycle()

在回調 onSubscribe 之前回調該方法的第一個參數的回調方法,可以使用該回調方法決定是否取消訂閱,doOnLifecycle() 第二個參數的回調方法的作用與 doOnDispose() 一樣

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doOnLifecycle(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e(TAG, "doOnLifecycle accept");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "doOnLifecycle run");
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
                this.disposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
                disposable.dispose();
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: doOnLifecycle accept
10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:31:45.011 9602-9602/leavesc.hello.rxjavademo E/MainActivity: doOnLifecycle run

5.10、doOnTerminate() & doAfterTerminate()

doOnTerminate 是在 onError 或者 onComplete 發送之前回調,而 doAfterTerminate 則是 onError 或者 onComplete 發送之後回調

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doOnTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "doOnTerminate run");
            }
        }).doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "doAfterTerminate run");
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
10-06 06:34:55.968 9713-9713/? E/MainActivity: onSubscribe
10-06 06:34:55.968 9713-9713/? E/MainActivity: onNext : 1
10-06 06:34:55.968 9713-9713/? E/MainActivity: onNext : 2
10-06 06:34:55.968 9713-9713/? E/MainActivity: doOnTerminate run
10-06 06:34:55.968 9713-9713/? E/MainActivity: onComplete
10-06 06:34:55.968 9713-9713/? E/MainActivity: doAfterTerminate run

5.11、doFinally()

在所有事件發送完畢之後回調該方法。 doFinally()doAfterTerminate() 的區別在於取消訂閱時,如果取消訂閱,之後 doAfterTerminate() 就不會被回調,而 doFinally() 無論怎麼樣都會被回調,且都會在事件序列的最後

5.12、onErrorReturn()

當接受到一個 onError() 事件之後回調,返回的值會回調 onNext() 方法,並正常結束該事件序列

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Exception("Normal Exception"));
            }
        }).onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Exception {
                return 7;
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:43:13.702 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 06:43:13.712 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onNext : 7
10-06 06:43:13.712 9946-9946/leavesc.hello.rxjavademo E/MainActivity: onComplete

5.13、onErrorResumeNext()

當接收到 onError() 事件時,返回一個新的 Observable,並正常結束事件序列

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Exception("Normal Exception"));
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                Log.e(TAG, "onErrorResumeNext apply: " + throwable.getMessage());
                return Observable.just(4, 5, 6);
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onErrorResumeNext apply: Normal Exception
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 4
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 5
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onNext : 6
10-06 06:46:36.650 10243-10243/leavesc.hello.rxjavademo E/MainActivity: onComplete

5.14、 onExceptionResumeNext()

onErrorResumeNext() 作用基本一致,但是這個方法只能捕捉 Exception,不能捕獲 Error

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onError(new Exception("Normal Exception"));
    }
}).onExceptionResumeNext(new Observable<Integer>() {
    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        Log.e(TAG, "onExceptionResumeNext subscribeActual");
        observer.onNext(3);
        observer.onComplete();
    }
}).subscribe(new Observer<Integer>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.e(TAG, "onNext : " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "onError : " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "onComplete");
    }
});
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onExceptionResumeNext subscribeActual
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onNext : 3
10-06 06:51:49.396 10369-10369/leavesc.hello.rxjavademo E/MainActivity: onComplete

emitter.onError(new Exception("Normal Exception")) 改為 emitter.onError(new Error("Normal Exception"));

異常將不會被捕獲

10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 06:53:21.655 10479-10479/leavesc.hello.rxjavademo E/MainActivity: onError : Normal Exception

5.15、retry()

如果出現錯誤事件,則會重新發送所有事件序列指定次數

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Error("Normal Exception"));
            }
        }).retry(2).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
10-06 06:55:17.273 10591-10591/? E/MainActivity: onSubscribe
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 1
10-06 06:55:17.273 10591-10591/? E/MainActivity: onNext : 2
10-06 06:55:17.273 10591-10591/? E/MainActivity: onError : Normal Exception

5.16、retryUntil()

出現錯誤事件之後,可以通過此方法判斷是否繼續發送事件

    private int index = 1;

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Exception("Normal Exception"));
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                Log.e(TAG, "getAsBoolean");
                return index == 7;
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
                index++;
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 1
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: getAsBoolean
10-06 07:19:07.675 11433-11433/leavesc.hello.rxjavademo E/MainActivity: onError : Normal Exception

5.17、repeat()

以指定次數重覆發送被觀察者的事件

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).repeat(2).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
10-06 07:38:47.680 12155-12155/? E/MainActivity: onSubscribe
10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 1
10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 2
10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 1
10-06 07:38:47.690 12155-12155/? E/MainActivity: onNext : 2
10-06 07:38:47.690 12155-12155/? E/MainActivity: onComplete

5.18、repeatWhen()

返回一個新的被觀察者來決定是否重覆發送事件。如果新的被觀察者返回 onComplete 或者 onError 事件,則舊的被觀察者不會發送事件。如果新的被觀察者返回其他事件,則舊的觀察者會發送事件

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
//                return Observable.empty();
//                return Observable.error(new Exception("Normal Exception"));
//                return Observable.just(1);
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext : " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });

以上三種情況的輸出結果分別是

10-06 14:29:05.641 20921-20921/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 14:29:05.641 20921-20921/leavesc.hello.rxjavademo E/MainActivity: onComplete
10-06 14:29:36.150 21027-21027/?

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.準備工作: Linux系統: 可以去 https://www.centos.org/download/ 下載DVD ISO版 虛擬機 2.VMware配置 3.Linux系統安裝 4.重啟,安裝成功 ...
  • 前幾天剛接手一個MySQL數據,操作系統為Ubuntu 16.04.5 LTS, 資料庫版本為5.7.23-0ubuntu0.16.04.1(APT方式安裝的MySQL)。這個操作系統下的MySQL的配置文件my.cnf很多地方都讓人有點不適應(跟之前的MySQL環境有些出入,之前都是維護RHEL、... ...
  • 測試環境: create table bqh6 (xm varchar2(10),bmbh number(2),bmmc varchar2(15),gz int);insert into bqh6 values ('張三',01,'技術支持',3500);insert into bqh6 value ...
  • 步驟一:安裝mysql依賴 步驟二:下載mysql社區版 步驟三:創建mysql用戶和用戶組 步驟四:解壓mysql文件 步驟五:創建文件夾 步驟六:初始化mysql 將root初始化密碼複製出來,等會登錄mysql需要使用這個密碼 步驟七:分配mysql文件夾許可權 步驟八:啟動mysql 步驟九: ...
  • 本文將在MySQL源碼探索系列技術博客的第1篇的基礎上接著分析dispatch_command()函數之後的工作流程,主要是分析mysql_parse()和mysql_execute_command()兩個函數的代碼框架,並對其中涉及到的隱式事務如何判斷等等問題結合MySQL源碼進行了介紹。 本人技 ...
  • Oracle用戶 Oracle用戶創建和授權詳解,參考網址如下: http://www.oraclejsq.com/getOracle_jcjc.do?nodeid=010100133 oracle用戶的概念對於Oracle資料庫至關重要,在現實環境當中一個伺服器一般只會安裝一個Oracle實例,一 ...
  • 點進來的同學,大部分是為了學編程而來的,這裡面有一部分學編程是出於興趣愛好,但大部分都是為了找工作或跳槽吧!其中有些人也許是覺得難,也許是遇到瓶頸,也許是因為惰性,總之半途而廢了。在這新一年的開始,我想對你說一句:不要輕易放棄,如果你覺得艱難,說明你正在走上坡路!在為你講為什麼要學習大數據前給分享一 ...
  • 據外媒phonearena報道,估計有3200萬台Android設備很快就無法使用谷歌Chrome移動瀏覽器。根據XDA最近提交的一份文件顯示,Chrome移動瀏覽器應用程式的最低API級別將從4.1提高到4.4。 這意味著仍然運行由Jelly Bean(Jelly Bean是Android 4.1 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...