官網: http://reactivex.io/ 它支持基本所有的主流語言. 這裡我簡單介紹一下Rx.NET. 之前我寫了幾篇關於RxJS的文章, 概念性的東西推薦看這些: http://www.cnblogs.com/cgzl/p/8641738.html http://www.cnblogs.c ...
它支持基本所有的主流語言.
這裡我簡單介紹一下Rx.NET.
之前我寫了幾篇關於RxJS的文章, 概念性的東西推薦看這些:
http://www.cnblogs.com/cgzl/p/8641738.html
http://www.cnblogs.com/cgzl/p/8649477.html
http://www.cnblogs.com/cgzl/p/8662625.html
基本概念和RxJS是一樣的.
下麵開始切入正題.
Rx.NET總覽
Rx.NET總體上看可以分為三個部分:
- 核心部分: Observables, Observers和Subjects
- LINQ和擴展, 用於查詢和過濾Observables
- 併發和調度的支持
.NET Core的Events
.net core裡面的event是通過委托對觀察者模式的實現.
但是event在.net core裡面並不是頭等公民:
- 人們對它的語法+=評價是褒貶不一的.
- 很難進行傳遞和組合
- 很難進行event的連串(chaining)和錯誤處理(尤其是同一個event有多個handler的時候)
- event並沒有歷史記錄
舉個例子:
滑鼠移動這個事件(event), 滑鼠移動的時候會觸發該事件, 這些事件會進入某個管道並記錄該滑鼠的坐標, 這樣就會產生一個數據的集合/序列/流.
這裡我們就是構建了一個基於時間線的滑鼠坐標的序列, 每一次觸發事件就會在這個管道上產生一個新的值. 在另一端, 一旦管道上有了新的值, 那麼管道的觀察者就會得到通知, 這些觀察者通過提供回調函數的方式來註冊到該管道上. 管道每次更新的時候, 這些回調函數就會被調用, 從而刷新了觀察者的數據.
這個例子里, Observable就是管道, 一系列的值在這裡被生成. Observer(觀察者)在Observable有新的值的時候會被通知.
核心介面
IObservable:
- Subscribe(IObserver<T> observer)
IObserver
- void OnNext<T>(T value), 序列里有新的值的時候會調用這個
- void OnCompleted(), 序列結束的時候調用這個
- void OnError(Exception ex), 發生錯誤的時候調用這個
這個和RxJS基本是一樣的.
Marble圖
可以通過marble圖來理解Rx
這圖表示的是IObserver, 每當有新的值在Observable出現的時候, 傳遞到IObservable的Subscribe方法的參數IObserver的OnNext方法就會調用. 發生錯誤的話 OnError方法就會調用, 整個流也就結束了. 沒有錯誤的話, 走到結束就會調用OnComplete方法. 不過有些Observable是不會結束的.
Observable.Subscribe()返回的Subscription對象被Dispose後, Observer就無法收到新的數據了.
創建Observable流/序列
創建流/序列的方式:
- 返回簡單的值
- 包裝現有的值
- 寫一個生成函數
簡單的Observables
- Observable.Empty 返回一個直接結束的Obsevable序列
- Observable.Never 返回一個沒有值, 且永遠不會結束的序列
- Observable.Throw(exception), 返回一個帶有錯誤的序列
- Observable.Return(xxx) 返回單值的序列
包裝Observables
可以包裝下麵這些來返回Observable:
- Action
- Observable.Start(() => 42) 返回一個含有42的序列, 併在Action結束的時候, OnComplete方法被調用.
- Task
- Task.ToObservable() 使用這個擴展方法進行包裝, 當Task結束的時候, Observable推送新的數據, 然後結束
- IEnumerable
- ienumerable.ToObservable() 也是擴展方法, ienumerable的每個值都會作為新的值被推送到Observable上, 最後結束OnComplete
- Event
- Observable.FromEventPattern(obj, "xxChanged") 這是個工廠方法, 需要提供觸發event的對象和event的名字.
生成函數
- Range
- Interval, Timer
- Create(低級), Generate
看圖解釋:
Observable.Range(1, 4):
Observable.Interval(200):
Observable.Timer(200, () => 42):
Observable.Create<int>(o => { o.OnNext(42); o.OnComplete(); return Disposable.Empty; });
Observable.Generate(1, value => value < 5, value => value + 1, value => value);
例子
using System; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using System.Threading.Tasks; namespace Test { class Program { static void Main(string[] args) { var sequence = GetTaskObservable(); sequence.Subscribe ( x => Console.WriteLine($"OnNext: {x}"), ex => Console.WriteLine($"OnError: {ex}"), () => Console.WriteLine("OnCompleted") ); Console.ReadKey(); } private static IObservable<int> GetSimpleObservable() { return Observable.Return(42); } private static IObservable<int> GetThrowObservable() { return Observable.Throw<int>(new ArgumentException("Error in observable")); } private static IObservable<int> GetEmptyObservable() { return Observable.Empty<int>(); } private static IObservable<int> GetTaskObservable() { return GetTask().ToObservable(); } private static async Task<int> GetTask() { return 42; } private static IObservable<int> GetRangeObservable() { return Observable.Range(2, 10); } private static IObservable<long> GetIntervalObservable() { return Observable.Interval(TimeSpan.FromMilliseconds(200)); } private static IObservable<int> GetCreateObservable() { return Observable.Create<int>(observer => { observer.OnNext(1); observer.OnNext(2); observer.OnNext(3); observer.OnNext(4); observer.OnCompleted(); return Disposable.Empty; }); } private static IObservable<int> GetGenerateObservable() { return Observable.Generate( 1, x => x < 5, x => x + 1, x => x ); } } }
請自行運行查看結果.
Cold 和 Hot Observable
Cold: Observable可以為每個Subscriber創建新的數據生產者
Hot: 每個Subscriber從訂閱的時候開始在同一個數據生產者那裡共用其餘的數據.
從原理來說是這樣的: Cold內部會創建一個新的數據生產者, 而Hot則會一直使用外部的數據生產者.
舉個例子:
Cold: 就相當於我在騰訊視頻買體育視頻會員, 可以從頭看裡面的足球比賽.
Hot: 就相當於看足球比賽的現場直播, 如果來晚了, 那麼前面就看不到了.
把Cold 變 Hot, 使用.Publish()方法.
把Hot 變 Cold, 使用.Subscribe()方法把它變成Subject即可.
過濾和控制序列
LINQ操作符
操作符的類型:
- 過濾
- 合併
- 聚合
- 工具
過濾
sequence.Where(x => x % 2 == 0):
.OfType<Square>():
移除重覆的:
.Distinct():
.DistinctUntilChanged():
過濾頭尾元素:
.Take(2) .Skip(2):
.SkipLast(2) .TakeLast(2):
序列的閥:
a.TakeUnit(b)l a.SkipUntil(b):
實際例子: 把滑鼠移動和點擊轉化為拖拽:
代碼非常的簡單:
var mouseDrags = mouseMoves.SkipUntil(mouseDowns).TakeUnit(mouseUps);
合併
a.Merge(b)
a.Amb(b), 其中的amb是ambiguous的縮寫:
a.Concat(b):
為序列配對:
a.CombineLatest(b, (x, y) => x + y):
a.Zip(b, (x, y) => x + y):
序列的序列:
Merge()是可以達到這種效果的:
.Switch():
聚合
聚合就是指把序列聚合成一個值, 在序列結束後才能返回值
Count() Sum():
Aggregate():
Scan():
其他工具操作符
會有一些副作用
.Do(x => Log(x)): 但是記住不要改變序列的元素
.TimeStamp():
.Throttle(TimeSpan.FromSeconds(1)):
非同步和多線程
非同步就表示不一定按順序執行, 但是它可以保證非阻塞, 通常會有回調函數(或者委托或者async await).
但是非同步對於Rx來說就是它的本性
Rx的同步非同步對比:
多線程
Rx不是多線程的, 但是它是線程自由的(就是可以使用多個線程), 它被設計成只是用必須的線程而已.
多線程表示, 同時有多個線程在執行. 也可以稱作併發. 它可以分擔計算量. 但是據需要考慮線程安全了.
Rx已經做了一些抽象, 所以不必過多的考慮線程安全了.
例如:
Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(xxx):
UI的例子:
Observable.Interval(TimeSpan.FromSeconds(1)).ObserveOn(SynchronizationContext.Current).Subscribe(t => searchBox.Text = t.ToString()):
如果計算量比較大的話:
Observable.Create(大量工作).Subscribe(xxx):
UI假死, 這就不好了.
應該這樣:
Observable.Create(大量工作).SubscribeOn(NewThreadScheduler.Default).ObserveOn(SynchronizationContext.Current).Subscribe(xxx):
Schedulers
Scheduler可以在Rx裡面安排執行動作. 它使用IScheduler介面.
現在就可以把Scheduler理解為是對未來執行的一個抽象.
它同時也負責著Rx所有的併發工作.
Rx提供了很多Scheduler.
下麵是.net現有有很多種在未來執行動作的方法:
Rx裡面就這個:
IScheduler介面:
基本上不用直接去使用IScheduler, 因為內置了很多現成的Schedulers了:
- Immediate, 這是唯一一個不是非同步的Scheduler
- CurrentThread
- EventLoop
- Dispatcher
- NewThread
- TaskPool, ThreadPool
Schedulers實際上到處都使用著:
應該用哪個Scheduler?
Fake Scheduler:
用於測試