RxJava 和 RxAndroid 五(線程調度)

来源:http://www.cnblogs.com/zhaoyanjun/archive/2016/07/01/5624395.html
-Advertisement-
Play Games

對rxJava不瞭解的同學可以先看 RxJava 和 RxAndroid 一 (基礎)RxJava 和 RxAndroid 二(操作符的使用)RxJava 和 RxAndroid 三(生命周期控制和記憶體優化) RxJava 和 RxAndroid 四(RxBinding的使用) 本文將有幾個例子說明 ...


對rxJava不瞭解的同學可以先看

RxJava 和 RxAndroid 一 (基礎)
RxJava 和 RxAndroid 二(操作符的使用)
RxJava 和 RxAndroid 三(生命周期控制和記憶體優化)

RxJava 和 RxAndroid 四(RxBinding的使用)

 

本文將有幾個例子說明,rxjava線程調度的正確使用姿勢。

例1

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: main           -- 主線程
/rx_map: main        --  主線程
/rx_subscribe: main   -- 主線程

例2

   new Thread(new Runnable() {
            @Override
            public void run() {
                Logger.v( "rx_newThread" , Thread.currentThread().getName()  );
                rx();
            }
        }).start();

 void rx(){
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

    }

 

      結果

/rx_newThread: Thread-564   -- 子線程
/rx_call: Thread-564              -- 子線程
/rx_map: Thread-564            -- 子線程 
/rx_subscribe: Thread-564    -- 子線程

 

  • 通過例1和例2,說明,Rxjava預設運行在當前線程中。如果當前線程是子線程,則rxjava運行在子線程;同樣,當前線程是主線程,則rxjava運行在主線程

 

例3

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: RxCachedThreadScheduler-1    --io線程
/rx_map: main                                     --主線程
/rx_subscribe: main                              --主線程

 

例4

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ; 

      結果

/rx_call: RxCachedThreadScheduler-1     --io線程
/rx_map: RxCachedThreadScheduler-1   --io線程
/rx_subscribe: main                              --主線程

   

  • 通過例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出 map() 操作符預設運行在事件產生的線程之中。事件消費只是在 subscribe() 裡面。
  • 對於 create() , just() , from()   等                 --- 事件產生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消費

  •   事件產生:預設運行在當前線程,可以由 subscribeOn()  自定義線程

         事件加工:預設跟事件產生的線程保持一致, 可以由 observeOn() 自定義線程

       事件消費:預設運行在當前線程,可以有observeOn() 自定義

 

例5  多次切換線程

 

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .observeOn( Schedulers.newThread() )    //新線程

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .observeOn( Schedulers.io() )      //io線程

                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        Logger.v( "rx_filter" , Thread.currentThread().getName()  );
                        return s != null ;
                    }
                })

                .subscribeOn(Schedulers.io())     //定義事件產生線程:io線程
                .observeOn(AndroidSchedulers.mainThread())     //事件消費線程:主線程

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: RxCachedThreadScheduler-1           -- io 線程
/rx_map: RxNewThreadScheduler-1             -- new出來的線程
/rx_filter: RxCachedThreadScheduler-2        -- io線程
/rx_subscribe: main                                   -- 主線程

 

例6:只規定了事件產生的線程

       Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .subscribeOn(Schedulers.io())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;

  結果

/rx--create: RxCachedThreadScheduler-4                      // io 線程
/rx--subscribe: RxCachedThreadScheduler-4                 // io 線程

     

例:7:只規定事件消費線程

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .observeOn( Schedulers.newThread() )
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;

  結果

/rx--create: main                                           -- 主線程
/rx--subscribe: RxNewThreadScheduler-1        --  new 出來的子線程 

      

    從例6可以看出,如果只規定了事件產生的線程,那麼事件消費線程將跟隨事件產生線程。

    從例7可以看出,如果只規定了事件消費的線程,那麼事件產生的線程和 當前線程保持一致。

 

例8:線程調度封裝

 在Android 常常有這樣的場景,後臺處理處理數據,前臺展示數據。

一般的用法:

   Observable
                .just( "123" )
                .subscribeOn( Schedulers.io())
                .observeOn( AndroidSchedulers.mainThread() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

  但是項目中這種場景有很多,所以我們就想能不能把這種場景的調度方式封裝起來,方便調用。

簡單的封裝

    public Observable apply( Observable observable ){
       return observable.subscribeOn( Schedulers.io() )
                .observeOn( AndroidSchedulers.mainThread() ) ;
    }

使用

  apply( Observable.just( "123" ) )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {

                    }
                }) ;

弊端:雖然上面的這種封裝可以做到線程調度的目的,但是它破壞了鏈式編程的結構,是編程風格變得不優雅。

改進:Transformers 的使用(就是轉化器的意思,把一種類型的Observable轉換成另一種類型的Observable )

改進後的封裝

    Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

  使用

      Observable
                .just( "123" )
                .compose( schedulersTransformer )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

  弊端:雖然保持了鏈式編程結構的完整,但是每次調用 .compose( schedulersTransformer ) 都是 new 了一個對象的。所以我們需要再次封裝,儘量保證單例的模式。

改進後的封裝

package lib.app.com.myapplication;

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;

/**
 * Created by ${zyj} on 2016/7/1.
 */
public class RxUtil {

    private final static Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

   public static  <T> Observable.Transformer<T, T> applySchedulers() {
        return (Observable.Transformer<T, T>) schedulersTransformer;
    }

}

  使用

    Observable
                .just( "123" )
                .compose( RxUtil.<String>applySchedulers() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

  

 

 

 

 

 

 

 

 

  

  

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 說到JavaScript中的分支結構,我們就不得不提到流程式控制制這個詞,我們所有的程式都是由數據和演算法組成的。程式=數據+演算法通常我們所說的演算法都可以通過"順序","分支","迴圈"三種結構來組合完成。 在ECMA中規定了一些語句(也稱為流程式控制制語句,分支結構語句),從本質上來說,這些語句定義了ECM ...
  • 在一些網站進行上傳時,當單擊了“瀏覽”按鈕之後會彈出【選擇文件】的對話框。想要實現這一功能,用input的file控制項來實現就好啦~ 效果圖是醬嬸的: 註意!別以為這個是由一個text和一個button組合成的,其實它就是一個file控制項哦 今天工作中遇到要求:不顯示“未選擇任何文件”,搗鼓夠一個小 ...
  • 【問題產生】 Webview 通過 addjavascriptInterface 傳遞對象給前端,一切正常。但是 Android官方已提醒此功能是有安全風險,改用 safe-java-js-webview-bridge 做java和js交互。 官方的用法正常: 但如果我們在body里的<script ...
  • 閉包 閉包: 指有權訪問另一個函數作用域中的變數的函數。 創建閉包的常見方式就是在一個函數內部創建另一個函數: function createComparisonFunction(propertyName) { return function (obj1, obj2) { // 訪問了外部函數中的變 ...
  • 1.流(flow)是瀏覽器在頁面上擺放HTML元素所用的方法。 對於塊元素,瀏覽器從上到下沿著元素流逐個顯示所遇到的各個元素,會在每個塊元素之間加一個換行; 對於內聯元素,在水平方向會相互挨著,總體上會從左上方留向右下方。 2.流與盒模型 盒模型:從CSS角度來看,每個元素都是一個盒子。由內容區(c ...
  • Chrome 瀏覽器具有強大的跨平臺能力以及豐富的擴展插件,一直是許多開發者的首要選擇。而利用許多 Chrome 插件,開發者們在開發流程中能夠極大地提高開發效率。我們就整理了十款開發者常用的 Chrome 插件推薦給大家,相信能夠在你的開發中助你一臂之力。 1. 掘金 Chrome 插件 :幫你發 ...
  • 一、1xx 消息 該類型的狀態碼代表請求已被接受,需要繼續處理。 100 Continue 客戶端應當繼續發送請求,這個臨時響應是用來通知客戶端的部分請求已經被伺服器接收,且仍未被拒絕。客戶端應當繼續發送請求的剩餘部分。如果請求已經完成,忽略這個響應。伺服器必須在請求完成後向客戶端發送一個最終響應。 ...
  • 一、概述 一般而言,listview每個item的樣式是一樣的,但也有很多應用場景下不同位置的item需要不同的樣式。 拿微信舉例,前者的代表作是消息列表,而後者的典型則是聊天會話界面。 本文重點介紹後者,也就是多類型item的listview的實現思路和方法,比如實現一個這樣的聊天會話頁面: 二、 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...