" 返回《C 併發編程》" "1. 轉換.NET事件" "1.1. 進度通知" "1.2. 定時器示例" "1.3. 錯誤傳遞" "2. 發通知給上下文" "3. 用視窗和緩衝對事件分組" "4. 用限流和抽樣抑制事件流" "4.1. Throttle" "4.2. Sample" "5. 超時" ...
要在應用中安裝一個 NuGet 包 System.Reactive
。
- Rx 可以認作是
LINQ to events
(基於IObservable<T>
) - 所有的 LINQ 操作都可以在 Rx 中使用。
- 從概念上看,過濾(Where)、投影(Select)等簡單操作,和其他 LINQ 提供者的操作是一樣的
1. 轉換.NET事件
1.1. 進度通知
public static void ProgressRun()
{
var progress = new Progress<int>();
var progressReports = Observable.FromEventPattern<int>(
handler => progress.ProgressChanged += handler,
handler => progress.ProgressChanged -= handler)
//.Where(u => u.EventArgs % 2 == 0)
;
progressReports.Subscribe(data => Console.WriteLine($"OnNext:{data.EventArgs},ThreadId:{Thread.CurrentThread.ManagedThreadId}."));
Reports(progress);
}
private static void Reports(IProgress<int> progress)
{
System.Console.WriteLine($"Reporting ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
for (int i = 0; i < 10; i++)
{
progress.Report(i);
}
System.Console.WriteLine($"Reported ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
}
輸出:
Reporting ThreadId:1.
Reported ThreadId:1.
OnNext:5,ThreadId:9.
OnNext:0,ThreadId:4.
OnNext:6,ThreadId:10.
OnNext:1,ThreadId:5.
OnNext:2,ThreadId:6.
OnNext:4,ThreadId:8.
OnNext:3,ThreadId:7.
OnNext:7,ThreadId:11.
OnNext:9,ThreadId:13.
OnNext:8,ThreadId:12.
1.2. 定時器示例
public static void TimerRun()
{
var timer = new System.Timers.Timer(interval: 300) { Enabled = true };
var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
handler => (s, a) => handler(s, a),
handler => timer.Elapsed += handler,
handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Console.WriteLine($"OnNext:{data.EventArgs.SignalTime.Millisecond}, ThreadId:{Thread.CurrentThread.ManagedThreadId}."));
System.Console.WriteLine($"Timer start ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
timer.Start();
Thread.Sleep(2000);
timer.Stop();
System.Console.WriteLine($"Timer stop ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
}
輸出:
Timer start ThreadId:1.
OnNext:473, ThreadId:4.
OnNext:772, ThreadId:5.
OnNext:73, ThreadId:5.
OnNext:373, ThreadId:5.
OnNext:673, ThreadId:5.
OnNext:975, ThreadId:5.
Timer stop ThreadId:1.
1.3. 錯誤傳遞
public static void ObErrorRun()
{
var tcs = new TaskCompletionSource<string>();
var client = new WebClient();
var downloadedStrings = Observable.FromEventPattern(client,
"DownloadStringCompleted"); downloadedStrings.Subscribe(
data =>
{
var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs;
if (eventArgs.Error != null)
{
Console.WriteLine("OnNext: (Error) " + eventArgs.Error.GetType());
}
else
{
Console.WriteLine("OnNext: " + eventArgs.Result);
}
},
ex => Console.WriteLine("OnError: " + ex.GetType()),
() => Console.WriteLine("OnCompleted"));
client.DownloadStringAsync(new Uri("http://invalid.example.com/"));
//client.DownloadStringAsync(new Uri("http://www.baidu.com/"));
Thread.Sleep(3000);
}
輸出:
OnNext: (Error) System.Net.WebException
把事件封裝進 Observable
對象後,每次引發該事件都會調用 OnNext
。在處理 AsyncCompletedEventArgs
時會發生令人奇怪的現象,所有的異常信息都是通過數據形式傳遞的(OnNext
),而不是通過錯誤傳遞(OnError
)。
2. 發通知給上下文
如 UI 元素只能被它所屬的 UI 線程式控制制,因此,如果要根據 Rx 的通知來修改 UI,就應該把通知“轉移”到 UI 線程。
- 可以使用
ObserveOn
把通知移動到一個線程池線程,在那裡進行計算,然後再把表示結果的通知返回給 UI 線程 - 通過同步上下文切換實現
public delegate void HelloEventHandler(object sender, HelloEventArgs e);
public class HelloEventArgs : EventArgs
{
public string Name { get; set; }
public HelloEventArgs(string name)
{
Name = name;
}
public int SayHello()
{
System.Console.WriteLine(Name + " Hello.");
return DateTime.Now.Millisecond;
}
}
public static event HelloEventHandler HelloHandlerEvent;
public static void ObservableEventRun()
{
IDisposable ob = null;
var task = Task.Run(() =>
{
Thread.Sleep(500);
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry"));
Thread.Sleep(2000);
ob?.Dispose();
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("NoConsole")); // 由於
});
// AsyncContext 比如就是 UI上下文
AsyncContext.Run(() =>
{
var uiContext = SynchronizationContext.Current;
Console.WriteLine("UI thread is " + Environment.CurrentManagedThreadId); //Observable.FromEvent()
ob = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>(
handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler)
.Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default)
.Select(s =>
{
// 複雜的計算過程。
Thread.Sleep(100);
var result = s;
Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId);
return result;
})
.ObserveOn(uiContext)
.Subscribe(s => Console.WriteLine("Subscribe Result " + s + " on thread " + Environment.CurrentManagedThreadId));
//此處不能 task.Wait(); ,會和 Subscribe 中的委托發生死鎖
System.Console.WriteLine("AsyncContext.Run Done on thread " + Environment.CurrentManagedThreadId);
});
task.Wait();
}
輸出:
UI thread is 1
AsyncContext.Run Done on thread 1
lilei Hello.
HanMeimei Hello.
Tom Hello.
Jerry Hello.
Now Millisecond result 36 on thread 6
Subscribe Result 36 on thread 1
Now Millisecond result 44 on thread 6
Subscribe Result 44 on thread 1
Now Millisecond result 44 on thread 6
Subscribe Result 44 on thread 1
Now Millisecond result 44 on thread 6
Subscribe Result 44 on thread 1
3. 用視窗和緩衝對事件分組
下麵的例子使用 Interval
,每秒創建 1 個 OnNext
通知,然後, 使用 Buffer
, 每 2 個通知做一次緩衝:
public static void BufferRun()
{
System.Console.WriteLine($"Buffer start ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
var ob = Observable.Interval(TimeSpan.FromMilliseconds(10))
.Buffer(2)
.Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Got {string.Join(",", x)}({Thread.CurrentThread.ManagedThreadId})"));
Thread.Sleep(100);
ob.Dispose();
System.Console.WriteLine($"Buffer end ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
}
輸出:
Buffer start ThreadId:1.
459: Got 0,1(5)
478: Got 2,3(5)
498: Got 4,5(5)
516: Got 6,7(5)
536: Got 8,9(5)
Buffer end ThreadId:1.
下麵的例子有些類似,使用 Window
創建一些事件組,每組包含 2 個事件:
public static void WindowRun()
{
System.Console.WriteLine($"Window start ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
var ob = Observable.Interval(TimeSpan.FromMilliseconds(10))
.Window(2)
.Subscribe(group =>
{
Console.WriteLine($"{DateTime.Now.Millisecond}: Starting new group({Thread.CurrentThread.ManagedThreadId})");
group.Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x},(TID:{Thread.CurrentThread.ManagedThreadId})"),
() => Console.WriteLine($"{DateTime.Now.Millisecond}: Ending group"));
});
Thread.Sleep(100);
ob.Dispose();
System.Console.WriteLine($"Window end ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
}
輸出:
Window start ThreadId:1.
959: Starting new group(1)
987: Saw 0,(TID:4)
991: Saw 1,(TID:4)
992: Ending group
994: Starting new group(4)
0: Saw 2,(TID:4)
11: Saw 3,(TID:4)
11: Ending group
11: Starting new group(4)
21: Saw 4,(TID:4)
30: Saw 5,(TID:4)
30: Ending group
30: Starting new group(4)
40: Saw 6,(TID:4)
50: Saw 7,(TID:4)
50: Ending group
51: Starting new group(4)
60: Saw 8,(TID:4)
70: Saw 9,(TID:4)
70: Ending group
70: Starting new group(4)
Window end ThreadId:1.
這幾個例子說明瞭 Buffer
和 Window
的區別:
Buffer
等待組內的所有事件,然後把所有事件作為一個集合發佈Window
用同樣的方法進行分組,但它是在每個事件到達時就發佈
4. 用限流和抽樣抑制事件流
4.1. Throttle
下麵的例子也是監視滑鼠移動, 但使用了 Throttle
,在滑鼠保持靜止 1 秒後才報告最近一條移動事件。
public delegate void MouseEventHandler(object sender, MouseEventArgs e);
public class MouseEventArgs : EventArgs
{
public (int, int) XY { get; set; }
public MouseEventArgs((int, int) xy)
{
XY = xy;
}
public (int, int) GetPosition()
{
return XY;
}
}
public static event MouseEventHandler MouseMove;
public static void ThrottleRun()
{
IDisposable ob = null;
var task = Task.Run(() =>
{
Thread.Sleep(200);
//不觸發
MouseMoveProcess((1, 1));
MouseMoveProcess((1, 11));
MouseMoveProcess((1, 111));
MouseMoveProcess((1, 1111));
//觸發
MouseMoveProcess((2, 2), 2000);
//超時結束
MouseMoveProcess((2, 22));
ob?.Dispose();
});
ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition())
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"));
task.Wait();
}
private static void MouseMoveProcess((int, int) xy, int sleepMillsecond = 200)
{
System.Console.WriteLine($"Mouse Move {xy.Item1},{xy.Item2},After sleep {sleepMillsecond}.");
MouseMove?.Invoke(new object(), new MouseEventArgs(xy));
Thread.Sleep(sleepMillsecond);
}
輸出:
Mouse Move 1,1,After sleep 200.
Mouse Move 1,11,After sleep 200.
Mouse Move 1,111,After sleep 200.
Mouse Move 1,1111,After sleep 200.
Mouse Move 2,2,After sleep 2000.
251: Saw 2,2
Mouse Move 2,22,After sleep 200.
Throttle
常用於類似“文本框自動填充”這樣的場合
- 用戶在文本框中輸入文字,當他停止輸入時,才需要進行真正的檢索。
4.2. Sample
為抑制快速運動的事件序列, Sample 建立了一個有規律的超時時間段, 每個時間段結束時,它就發佈該時間段內最後的一條數據。如果這個時間段沒有數據,就不發佈。
每隔一秒採樣一次
public static void SampleRun()
{
IDisposable ob = null;
var task = Task.Run(() =>
{
Thread.Sleep(200);
//不觸發
MouseMoveProcess((1, 1));
MouseMoveProcess((1, 11));
MouseMoveProcess((1, 111));
MouseMoveProcess((1, 1111));
//觸發
MouseMoveProcess((2, 2), 2000);
//超時結束
MouseMoveProcess((2, 22));
ob?.Dispose();
});
ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition())
.Sample(TimeSpan.FromMilliseconds(500))
.Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"));
task.Wait();
}
輸出:
Mouse Move 1,1,After sleep 200.
Mouse Move 1,11,After sleep 200.
498: Saw 1,11
Mouse Move 1,111,After sleep 200.
Mouse Move 1,1111,After sleep 200.
Mouse Move 2,2,After sleep 2000.
991: Saw 2,2
Mouse Move 2,22,After sleep 200.
992: Saw 2,22
Throttle
和 Sample
操作符與 Where
基本差不多,唯一的區別是:
Throttle
、Sample
根據時間段過濾- 而
Where
根據事件的數據過濾
在抑制快速涌來的輸入流時,這三種操作符提供了三種不同的方法。
5. 超時
Timeout 操作符在輸入流上建立一個可調節的超時視窗。一旦新的事件到達,就重置超時視窗。如果超過期限後事件仍沒到達, Timeout 操作符就結束流,並產生一個包含TimeoutException 的 OnError 通知。
public static void TimeoutRun()
{
IDisposable ob = null;
var task = Task.Run(() =>
{
Thread.Sleep(200);
//不觸發
MouseMoveProcess((1, 1));
MouseMoveProcess((1, 11));
MouseMoveProcess((1, 111));
MouseMoveProcess((1, 1111));
//觸發
MouseMoveProcess((2, 2), 1100);
//超時結束
MouseMoveProcess((2, 22));
ob?.Dispose();
});
ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition())
.Timeout(TimeSpan.FromSeconds(1))//Subscribe後相對一秒超時(連續觸發則不會超時)
.Subscribe(
x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"),
ex => Console.WriteLine($"{DateTime.Now.Millisecond}: {ex.GetType().Name}"),
// onCompleted 不會執行
() => System.Console.WriteLine($"{DateTime.Now.Millisecond}: Finished.")
);
System.Console.WriteLine($"{DateTime.Now.Millisecond} Subscribe Done");
task.Wait();
}
輸出:
138 Subscribe Done
Mouse Move 1,1,After sleep 200.
313: Saw 1,1
Mouse Move 1,11,After sleep 200.
517: Saw 1,11
Mouse Move 1,111,After sleep 200.
722: Saw 1,111
Mouse Move 1,1111,After sleep 200.
923: Saw 1,1111
Mouse Move 2,2,After sleep 1100.
124: Saw 2,2
139: TimeoutException
Mouse Move 2,22,After sleep 200.
在超時之前觀察滑鼠移動,超時發生後進行切換
public static event MouseEventHandler OtherMouseMove;
public static void TimeoutMoveRun()
{
IDisposable ob = null;
var task = Task.Run(() =>
{
Thread.Sleep(200);
//不觸發
MouseMoveProcess((1, 1), 400);
MouseMoveProcess((1, 11), 0);
//為了觸發超時
Thread.Sleep(1100);
System.Console.WriteLine("sleep: 1100");
//由於超時,時間流被遷移到other,下麵不會觸發
MouseMoveProcess((2, 2), 400);
MouseMoveProcess((2, 22), 400);
//other的事件,可以觸發
OtherMouseMoveProcess((3, 3), 400);
OtherMouseMoveProcess((3, 33), 400);
ob?.Dispose();
});
var other = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => OtherMouseMove += handler,
handler => OtherMouseMove -= handler)
.Select(x => x.EventArgs.GetPosition());
ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition())
.Timeout(TimeSpan.FromSeconds(1), other)
.Subscribe(
x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"),
ex => Console.WriteLine($"{DateTime.Now.Millisecond}: {ex.GetType().Name}"));
System.Console.WriteLine($"{DateTime.Now.Millisecond} Subscribe Done");
task.Wait();
}
private static void OtherMouseMoveProcess((int, int) xy, int sleepMillsecond = 200)
{
System.Console.WriteLine($"Other Mouse Move {xy.Item1},{xy.Item2},After sleep {sleepMillsecond}.");
OtherMouseMove?.Invoke(new object(), new MouseEventArgs(xy));
Thread.Sleep(sleepMillsecond);
}
輸出:
793 Subscribe Done
Mouse Move 1,1,After sleep 400.
970: Saw 1,1
Mouse Move 1,11,After sleep 0.
373: Saw 1,11
sleep: 1100
Mouse Move 2,2,After sleep 400.
Mouse Move 2,22,After sleep 400.
Other Mouse Move 3,3,After sleep 400.
281: Saw 3,3
Other Mouse Move 3,33,After sleep 400.
684: Saw 3,33