Rx.NET 簡介

来源:https://www.cnblogs.com/cgzl/archive/2018/04/04/8710891.html
-Advertisement-
Play Games

官網: http://reactivex.io/ 它支持基本所有的主流語言. 這裡我簡單介紹一下Rx.NET. 之前我寫了幾篇關於RxJS的文章, 概念性的東西推薦看這些: http://www.cnblogs.com/cgzl/p/8641738.html http://www.cnblogs.c ...


官網: http://reactivex.io/

它支持基本所有的主流語言.

這裡我簡單介紹一下Rx.NET.

之前我寫了幾篇關於RxJS的文章, 概念性的東西推薦看這些:

http://www.cnblogs.com/cgzl/p/8641738.html

http://www.cnblogs.com/cgzl/p/8649477.html

http://www.cnblogs.com/cgzl/p/8662625.html

基本概念和RxJS是一樣的.

下麵開始切入正題.

Rx.NET總覽

Rx.NET總體上看可以分為三個部分:

  • 核心部分: Observables, Observers和Subjects
  • LINQ和擴展, 用於查詢和過濾Observables
  • 併發和調度的支持

.NET Core的Events

.net core裡面的event是通過委托對觀察者模式的實現.

但是event在.net core裡面並不是頭等公民:

  • 人們對它的語法+=評價是褒貶不一的.
  • 很難進行傳遞和組合
  • 很難進行event的連串(chaining)和錯誤處理(尤其是同一個event有多個handler的時候)
  • event並沒有歷史記錄

舉個例子:

滑鼠移動這個事件(event), 滑鼠移動的時候會觸發該事件, 這些事件會進入某個管道並記錄該滑鼠的坐標, 這樣就會產生一個數據的集合/序列/流.

這裡我們就是構建了一個基於時間線的滑鼠坐標的序列, 每一次觸發事件就會在這個管道上產生一個新的值. 在另一端, 一旦管道上有了新的值, 那麼管道的觀察者就會得到通知, 這些觀察者通過提供回調函數的方式來註冊到該管道上. 管道每次更新的時候, 這些回調函數就會被調用, 從而刷新了觀察者的數據.

這個例子里, Observable就是管道, 一系列的值在這裡被生成. Observer(觀察者)在Observable有新的值的時候會被通知.

核心介面

IObservable:

  • Subscribe(IObserver<T> observer)

IObserver

  • void OnNext<T>(T value), 序列里有新的值的時候會調用這個
  • void OnCompleted(), 序列結束的時候調用這個
  • void OnError(Exception ex), 發生錯誤的時候調用這個

這個和RxJS基本是一樣的.

Marble圖

可以通過marble圖來理解Rx

這圖表示的是IObserver, 每當有新的值在Observable出現的時候, 傳遞到IObservable的Subscribe方法的參數IObserver的OnNext方法就會調用. 發生錯誤的話 OnError方法就會調用, 整個流也就結束了. 沒有錯誤的話, 走到結束就會調用OnComplete方法. 不過有些Observable是不會結束的.

Observable.Subscribe()返回的Subscription對象被Dispose後, Observer就無法收到新的數據了.

 

創建Observable流/序列

 創建流/序列的方式:

  • 返回簡單的值
  • 包裝現有的值
  • 寫一個生成函數

簡單的Observables

  • Observable.Empty 返回一個直接結束的Obsevable序列
  • Observable.Never 返回一個沒有值, 且永遠不會結束的序列
  • Observable.Throw(exception), 返回一個帶有錯誤的序列
  • Observable.Return(xxx) 返回單值的序列

包裝Observables

可以包裝下麵這些來返回Observable:

  • Action
    • Observable.Start(() => 42) 返回一個含有42的序列, 併在Action結束的時候, OnComplete方法被調用.
  • Task
    • Task.ToObservable() 使用這個擴展方法進行包裝, 當Task結束的時候, Observable推送新的數據, 然後結束
  • IEnumerable
    • ienumerable.ToObservable() 也是擴展方法, ienumerable的每個值都會作為新的值被推送到Observable上, 最後結束OnComplete
  • Event
    • Observable.FromEventPattern(obj, "xxChanged") 這是個工廠方法, 需要提供觸發event的對象和event的名字.

生成函數

  • Range
  • Interval, Timer
  • Create(低級), Generate

看圖解釋:

Observable.Range(1, 4):

Observable.Interval(200):

Observable.Timer(200, () => 42):

            Observable.Create<int>(o =>
            {
                o.OnNext(42);
                o.OnComplete();
                return Disposable.Empty;
            });

 

Observable.Generate(1,
value => value < 5,
value => value + 1,
value => value);

 

例子

using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            var sequence = GetTaskObservable();
            sequence.Subscribe
            (
                x => Console.WriteLine($"OnNext: {x}"),
                ex => Console.WriteLine($"OnError: {ex}"),
                () => Console.WriteLine("OnCompleted")
            );
            Console.ReadKey();
        }

        private static IObservable<int> GetSimpleObservable()
        {
            return Observable.Return(42);
        }

        private static IObservable<int> GetThrowObservable()
        {
            return Observable.Throw<int>(new ArgumentException("Error in observable"));
        }

        private static IObservable<int> GetEmptyObservable()
        {
            return Observable.Empty<int>();
        }

        private static IObservable<int> GetTaskObservable()
        {
            return GetTask().ToObservable();
        }

        private static async Task<int> GetTask()
        {
            return 42;
        }

        private static IObservable<int> GetRangeObservable()
        {
            return Observable.Range(2, 10);
        }

        private static IObservable<long> GetIntervalObservable()
        {
            return Observable.Interval(TimeSpan.FromMilliseconds(200));
        }

        private static IObservable<int> GetCreateObservable()
        {
            return Observable.Create<int>(observer =>
            {
                observer.OnNext(1);
                observer.OnNext(2);
                observer.OnNext(3);
                observer.OnNext(4);
                observer.OnCompleted();
                return Disposable.Empty;
            });
        }

        private static IObservable<int> GetGenerateObservable()
        {
            return Observable.Generate(
                1,
                x => x < 5,
                x => x + 1,
                x => x
            );
        }
    }
}

 

請自行運行查看結果.

Cold 和 Hot Observable

Cold: Observable可以為每個Subscriber創建新的數據生產者

Hot: 每個Subscriber從訂閱的時候開始在同一個數據生產者那裡共用其餘的數據.

從原理來說是這樣的: Cold內部會創建一個新的數據生產者, 而Hot則會一直使用外部的數據生產者.

舉個例子:

Cold: 就相當於我在騰訊視頻買體育視頻會員, 可以從頭看裡面的足球比賽.

Hot: 就相當於看足球比賽的現場直播, 如果來晚了, 那麼前面就看不到了.

把Cold 變 Hot, 使用.Publish()方法.

把Hot 變 Cold, 使用.Subscribe()方法把它變成Subject即可.

 

過濾和控制序列

LINQ操作符

操作符的類型:

  • 過濾
  • 合併
  • 聚合
  • 工具

過濾

sequence.Where(x => x % 2 == 0):

.OfType<Square>():

移除重覆的:

.Distinct():

.DistinctUntilChanged():

過濾頭尾元素:

.Take(2)  .Skip(2):

.SkipLast(2)     .TakeLast(2):

序列的閥:

a.TakeUnit(b)l   a.SkipUntil(b):

實際例子: 把滑鼠移動和點擊轉化為拖拽:

代碼非常的簡單:

var mouseDrags = mouseMoves.SkipUntil(mouseDowns).TakeUnit(mouseUps);

合併

a.Merge(b)

a.Amb(b), 其中的amb是ambiguous的縮寫:

a.Concat(b):

為序列配對:

a.CombineLatest(b, (x, y) => x + y):

a.Zip(b, (x, y) => x +  y):

序列的序列:

Merge()是可以達到這種效果的:

.Switch():

聚合

聚合就是指把序列聚合成一個值, 在序列結束後才能返回值

Count() Sum():

Aggregate():

Scan():

其他工具操作符

會有一些副作用

 .Do(x => Log(x)): 但是記住不要改變序列的元素

.TimeStamp():

.Throttle(TimeSpan.FromSeconds(1))

 

非同步和多線程

非同步就表示不一定按順序執行, 但是它可以保證非阻塞, 通常會有回調函數(或者委托或者async await).

但是非同步對於Rx來說就是它的本性

Rx的同步非同步對比:

多線程

Rx不是多線程的, 但是它是線程自由的(就是可以使用多個線程), 它被設計成只是用必須的線程而已.

多線程表示, 同時有多個線程在執行. 也可以稱作併發. 它可以分擔計算量. 但是據需要考慮線程安全了.

Rx已經做了一些抽象, 所以不必過多的考慮線程安全了.

例如: 

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(xxx):

UI的例子:

Observable.Interval(TimeSpan.FromSeconds(1)).ObserveOn(SynchronizationContext.Current).Subscribe(t => searchBox.Text = t.ToString()):

如果計算量比較大的話:

Observable.Create(大量工作).Subscribe(xxx):

UI假死, 這就不好了.

應該這樣:

Observable.Create(大量工作).SubscribeOn(NewThreadScheduler.Default).ObserveOn(SynchronizationContext.Current).Subscribe(xxx):

 

Schedulers

Scheduler可以在Rx裡面安排執行動作. 它使用IScheduler介面.

現在就可以把Scheduler理解為是對未來執行的一個抽象.

它同時也負責著Rx所有的併發工作.

Rx提供了很多Scheduler.

下麵是.net現有有很多種在未來執行動作的方法:

Rx裡面就這個:

IScheduler介面:

基本上不用直接去使用IScheduler, 因為內置了很多現成的Schedulers了:

  • Immediate, 這是唯一一個不是非同步的Scheduler
  • CurrentThread
  • EventLoop
  • Dispatcher
  • NewThread 
  • TaskPool, ThreadPool

Schedulers實際上到處都使用著:

應該用哪個Scheduler?

Fake Scheduler:

用於測試

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 大家好!我叫藍顏,我是一名大專生。這是我第一次接觸博客園,以後也會一直在。 在學校期間,參加技能大賽(物聯網),接觸到的C#。之後學校教務處要一個調課軟體, 於是我就小試牛刀試了試。當然了,這也是我第一次寫,途中遇到很多問題。竟然是調課 系統肯定會用到word,因為老師的課表都是用word成列出來的 ...
  • 在序言中,我們提到函數式編程的兩大特征:無副作用、函數是第一公民。現在,我們先來深入第一個特征:無副作用。 無副作用是通過引用透明(Referential transparency)來定義的。如果一個表達式滿足將它替換成它的值,而程式的行為不變,則稱這個表達式是引用透明的。 現在,我們不妨進行一個嘗 ...
  • 介紹asp.net core創建的列表模板頁面與一些佈局信息。 ...
  • 1、在Startup類的Configure方法,添加身份驗證的中間件AuthenticationMiddleware 2、在Startup類的ConfigureServices方法,添加Cookie驗證的服務,使用Cookies驗證體系, CookieAuthenticationDefaults.A ...
  • 前言 由於項目需求,需要將Excel中的數據進過一定轉換導入僅Oracle資料庫中。考慮到當Excel數據量較大時,迴圈Insert語句效率太低,故採用批量插入的方法。在插入操作運行時,會造成系統短暫的“卡死”現象。為了讓用戶知道插入的狀態,需要製作一個進度條來顯示插入的進度。 批量插入 項目中運用 ...
  • MiniBlink的作者是 龍泉寺掃地僧 miniblink是什麼? (抄了一下 龍泉寺掃地僧 寫的簡潔) Miniblink是一個全新的、追求極致小巧的瀏覽器內核項目,其基於chromium最新版內核,去除了chromium所有多餘的部件,只保留最基本的排版引擎blink。Miniblink保持了 ...
  • 深入.NET平臺和C#編程 之化骨綿掌 假 如 你 的 人 生 有 理 想,那 麽 就 一 定 要 去 追,不 管 你 現 在 的 理 想 在 別 人 看 來是 多 麽 的 可 笑 , 你 也 不 用 在 乎& ...
  • 一、概述 數據透視表(Pivot Table)是一種互動式的表,可以進行某些計算,如求和與計數等,可動態地改變透視表版面佈置,也可以重新安排行號、列標和頁欄位。當改變版面佈置時,數據透視表也會按照新的佈置來進行更新,可以說是一個功能強大的數據分析工具。因此,本篇文章將介紹在C# 中關於Excel數據 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...