RxJava2.0(轉載)

来源:https://www.cnblogs.com/lanjiabin/archive/2020/07/07/13261608.html
-Advertisement-
Play Games

零、來源 來源:Carson_Ho-簡書 一、基礎知識 角色 作用 類比 被觀察者(Observable) 產生事件 顧客 觀察者(Observer) 接收事件,並給出響應動作 廚房 訂閱(Subscribe) 連接 被觀察者 & 觀察者 服務員 事件(Event) 被觀察者 & 觀察者 溝通的載體 ...


零、來源

來源:Carson_Ho-簡書

一、基礎知識

角色 作用 類比
被觀察者(Observable) 產生事件 顧客
觀察者(Observer) 接收事件,並給出響應動作 廚房
訂閱(Subscribe) 連接 被觀察者 & 觀察者 服務員
事件(Event) 被觀察者 & 觀察者 溝通的載體 菜式


二、基礎使用

1.導入連接

implementation 'io.reactivex.rxjava2:rxjava:2.2.19'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

2.創建被觀察者

   //創建被觀察者,產生事件
    public Observable<Integer> createObservable() {
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });
        return observable;
    }

3.創建觀察者

	//創建觀察者
    public Observer<Integer> createObserver() {
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "onSubscribe 連接");
            }

            @Override
            public void onNext(Integer value) {
                Log.v("lanjiabinRx", "onNext " + value + " 事件");
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "onComplete 事件");
            }
        };
        return observer;
    }

4.建立subscribe()連接

 	//觀察者訂閱被觀察者
    public void createSubscribe() {
        createObservable().subscribe(createObserver());
    }

5.調用和結果

 createSubscribe();

image.png

6.鏈式調用

 	//鏈式調用
    public void chainCall() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "onSubscribe 連接");
            }

            @Override
            public void onNext(Integer value) {
                Log.v("lanjiabinRx", "onNext " + value + " 事件");
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "onComplete 事件");
            }
        });
    }

6.切斷連接

即觀察者 無法繼續 接收 被觀察者的事件,但被觀察者還是可以繼續發送事件

Disposable mDisposable; //1.定義

@Override
public void onSubscribe(Disposable d) {
       mDisposable=d; //2.賦值
       Log.v("lanjiabinRx", "onSubscribe 連接");
}

@Override
public void onNext(Integer value) {
      if (value==2) mDisposable.dispose(); //3.在第二個next事件斷開連接
      Log.v("lanjiabinRx", "onNext " + value + " 事件");
}

三、創建操作符

0.總圖


1. create (基礎發送)

最基礎的創建


    //1.create
    public void chainCall() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "onSubscribe 連接");
            }

            @Override
            public void onNext(Integer value) {
                Log.v("lanjiabinRx", "onNext " + value + " 事件");
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "onComplete 事件");
            }
        });
    }

2. just (立刻發送10以下)

  • 快速創建1個被觀察者對象(Observable)
  • 發送事件的特點:直接發送傳入的事件
  • 最多只能發送十個參數
  • 應用場景:快速創建 被觀察者對象(Observable) & 發送10個以下事件

    //2.just
    public void justDo() {
        Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:
image.png

3. fromArray (數組發送)

  • 快速創建1個被觀察者對象(Observable)
  • 發送事件的特點:直接發送 傳入的數組數據
  • 會將數組中的數據轉換為Observable對象

應用場景:
1.快速創建 被觀察者對象(Observable) & 發送10個以上事件(數組形式)
2.數組元素遍歷


    //3.fromArray
    public void fromArrayDo() {
        Integer[] items = {0, 1, 2, 3, 4};
        Observable.fromArray(items).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

4. fromIterable (集合發送)

  • 快速創建1個被觀察者對象(Observable)
  • 發送事件的特點:直接發送 傳入的集合List數據
  • 會將數組中的數據轉換為Observable對象

應用場景:
1.快速創建 被觀察者對象(Observable) & 發送10個以上事件(集合形式)
2.集合元素遍歷


    //4.fromIterable
    public void fromIterableDo(){
        List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);
        Observable.fromIterable(list).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

// 下列方法一般用於測試使用

<-- empty()  -->
// 該方法創建的被觀察者對象發送事件的特點:僅發送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即觀察者接收後會直接調用onCompleted()

<-- error()  -->
// 該方法創建的被觀察者對象發送事件的特點:僅發送Error事件,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收後會直接調用onError()

<-- never()  -->
// 該方法創建的被觀察者對象發送事件的特點:不發送任何事件
Observable observable3=Observable.never();
// 即觀察者接收後什麼都不調用

5. defer (獲取最新數據)

  • 直到有觀察者(Observer )訂閱時,才動態創建被觀察者對象(Observable) & 發送事件
  • 通過 Observable工廠方法創建被觀察者對象(Observable)
  • 每次訂閱後,都會得到一個剛創建的最新的Observable對象,這可以確保Observable對象里的數據是最新的

應用場景:
動態創建被觀察者對象(Observable) & 獲取最新的Observable對象數據

   //5.defer
    Integer i = 10; //第一次賦值

    public void deferDo() {
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });

        i = 15; //第二次賦值
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:得到最新賦值的數字,說明取到了最新的數據

image.png

6. timer (延遲發送)

  • 快速創建1個被觀察者對象(Observable)
  • 發送事件的特點:延遲指定時間後,發送1個數值0(Long類型)
  • 本質 = 延遲指定時間後,調用一次 onNext(0)

應用場景:
延遲指定事件,發送一個0,一般用於檢測


    //6.timer
    public void timerDo() {
        // 註:timer操作符預設運行在一個新線程上
        // 也可自定義線程調度器(第3個參數):timer(long,TimeUnit,Scheduler)
        //TimeUnit.SECONDS延遲2s後,發送一個0
     
        /**
         * timer(long delay, TimeUnit unit)
         * delay 數值
         * unit 單位
         * 下麵就是 2數值,單位為秒,所以是2秒
         * */
        Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Long aLong) {
                /*
                * 得到的結果為0 一般用於檢測
                * */
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

7. interval (周期發送,無限)

  • 快速創建1個被觀察者對象(Observable)
  • 發送事件的特點:每隔指定時間 就發送 事件
  • 發送的事件序列 = 從0開始、無限遞增1的的整數序列

    //7.interval
    public void intervalDo() {
        /**
         * 從0開始遞增
         *
         * @param initialDelay (Long)
         *          初始延遲時間(第一次延遲時間)
         * @param period (Long)
         *          後續數字發射之間的時間間隔(一個周期時間)
         * @param unit
         *          時間單位
         * */
        Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Long aLong) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

8. intervalRange (周期發送,有限,指定數據)

  • 作用類似於interval(),但可指定發送的數據的數量
 	//8.intervalRange
    public void intervalRangeDo() {
        /**
         *
         * @param start 起始值
         * @param count 總共要發送的值的數量,如果為零,則運算符將在初始延遲後發出onComplete
         * @param initialDelay 發出第一個值(開始)之前的初始延遲
         * @param period 後續值之間的時間段
         * @param unit 時間單位
         * */
        Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Long aLong) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:3-12 經歷10個數

image.png

9. range (無延遲,Integer類型指定數據)

  • 作用類似於intervalRange(),但區別在於:無延遲發送事件
	//9.range
    public void rangeDo(){
        /**
         * @param start
         *            序列中第一個Integer的值
         * @param count
         *           要生成的順序整數的數量
         * */
        Observable.range(3,5).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }	

結果:

image.png

10. rangeLong (無延遲,Long類型指定數據)


    //10.rangeLong
    public void rangeLongDo(){
        /**
         * @param start
         *            Long類型,序列中第一個Integer的值
         * @param count
         *           Long類型,要生成的順序整數的數量
         * */
        Observable.rangeLong(3,8).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe連接");
            }

            @Override
            public void onNext(Long aLong) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

編程中我們會遇到多少挫折?表放棄,沙漠盡頭必是綠洲。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • kmalloc、vmalloc和malloc這三個常用的API函數具有相當的分量,三者看上去很相似,但在實現上大有講究。kmalloc基於slab分配器,slab緩衝區建立在一個連續的物理地址的大塊記憶體之上,所以緩衝對象也是物理地址連續的。如果在內核中不需要連續的物理地址,而僅僅需要內核空間里連續的 ...
  • 筆者最近工作中遇見一個性能瓶頸問題,MySQL表,每天大概新增776萬條記錄,存儲周期為7天,超過7天的數據需要在新增記錄前老化。連續運行9天以後,刪除一天的數據大概需要3個半小時(環境:128G, 32核,4T硬碟),而這是不能接受的。當然如果要整個表刪除,毋庸置疑用 TRUNCATE TABLE ...
  • 問題現象:在資料庫中的日期顯示正常,但是通過JSP進行調用後發現時間偏差了13個小時。經查詢,此問題是由於MySQL的預設時區設置造成的。 具體排查和操作方法如下: 一、查看MySQL當前時區和時間 > show variables like "%time_zone%"; #查看時區 #time_z ...
  • 什麼是事務 事務是應用程式中一系列對資料庫的操作,所有操作必須成功完成,否則在每個操作中所作的所有更改都會被撤消。也就是事務具有原子性,一個事務中的一系列的操作要麼全部成功,要麼一個都不做。 事務的結束有兩種,當事務中的所有操作全部執行成功時,事務提交結束。如果其中一個操作失敗,將全部回滾到事務執行 ...
  • MySQL 資料庫的存儲結構 資料庫存儲結構 從小到大、行>頁 >區>段>表空間 (在Oracle中將頁稱為"塊") 頁是資料庫管理存儲空間的基本單位,即,資料庫I/O的最小單位是頁 InnoDB預設頁大小為16K,可以通過show variavles like '%innodb_page_size ...
  • 一、什麼是手工管理的備份與恢復? 儘管在Oracle中,已經有了RMAN的備份與恢復。但是作為Oracle備份恢復的一種方式,我們將在本文中通過一個例子來為大家介紹如何使用手工的方式來完成Oracle的備份與恢復。**手工方式的本質是通過操作系統的cp命令完成,**但是在備份與恢復的時候,需要把數據 ...
  • 流處理正變得像數據處理一樣流行。流處理已經超出了其原來的實時數據處理的範疇,它正在成為一種提供數據處理(包括批處理),實時應用乃至分散式事務的新方法的技術。 1、什麼是流處理? 流處理是不斷合併新數據以計算結果的動作。在流處理中,輸入數據不受限制,並且沒有預定的開始或結束。它只是形成一系列事件,這些 ...
  • 本文更新於2019-06-23,使用MySQL 5.7,操作系統為Deepin 15.4。 SQL語句 為了便於描述,此處將創建觸發器的DDL覆述一次,其已於“SQL”章節描述。 CREATE TRIGGER triggername BEFORE|AFTER INSERT|UPDATE|DELETE ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...