" 返回《C 併發編程》" "1. 用 async 代碼封裝非同步方法與 Completed 事件" "2. 用 async 代碼封裝 Begin/End 方法" "3. 用 async 代碼封裝並行代碼" "4. 用 async 代碼封裝 Rx Observable 對象" "5. 用 Rx Obs ...
- 1. 用 async 代碼封裝非同步方法與 Completed 事件
- 2. 用 async 代碼封裝 Begin/End 方法
- 3. 用 async 代碼封裝並行代碼
- 4. 用 async 代碼封裝 Rx Observable 對象
- 5. 用 Rx Observable 對象封裝 async 代碼
- 6. Rx Observable 對象和數據流網格
非同步封裝
1. 用 async 代碼封裝非同步方法與 Completed 事件
public static void MyDownloadStringTaskAsyncRun()
{
WebClient client = new WebClient();
string res = client.MyDownloadStringTaskAsync(new Uri("http://www.baidu.com")).Result;
System.Console.WriteLine(res);
}
public static Task<string> MyDownloadStringTaskAsync(this WebClient client, Uri address)
{
var tcs = new TaskCompletionSource<string>();
// 這個事件處理程式會完成 Task 對象,並自行註銷。
DownloadStringCompletedEventHandler handler = null;
handler = (_, e) =>
{
client.DownloadStringCompleted -= handler;
if (e.Cancelled)
tcs.TrySetCanceled();
else if (e.Error != null)
tcs.TrySetException(e.Error);
else
tcs.TrySetResult(e.Result);
};
// 登記事件,然後開始操作。
client.DownloadStringCompleted += handler;
client.DownloadStringAsync(address);
return tcs.Task;
}
輸出:
<!DOCTYPE html><!--STATUS OK-->
<html>
... ...
</html>
2. 用 async 代碼封裝 Begin/End 方法
public static void GetResponseAsyncRun()
{
WebRequest request = WebRequest.Create("http://www.baidu.com");
var response = request.MyGetResponseAsync().Result;
System.Console.WriteLine($"WebResponse.ContentLength:{response.ContentLength}");
}
public static Task<WebResponse> MyGetResponseAsync(this WebRequest client)
{
return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse, client.EndGetResponse, null);
}
輸出:
WebResponse.ContentLength:14615
- 建議: 要在調用
FromAsync
之前調用BeginOperation
。 - 調用
FromAsync
,並讓用BeginOperation
方法返回的IAsyncOperation
作為參數,這樣也是可以的,但是FromAsync
會採用效率較低的實現方式。
3. 用 async 代碼封裝並行代碼
await Task.Run(() => Parallel.ForEach(...));
通過使用 Task.Run
,所有的並行處理過程都推給了線程池。
Task.Run
返回一個代表並行任務的 Task
對象
- UI 線程可以(非同步地)等待它完成(非阻塞)
4. 用 async 代碼封裝 Rx Observable 對象
事件流中幾種可能關註的情況:
- 事件流結束前的最後一個事件;
- 下一個事件;
- 所有事件。
public delegate void HelloEventHandler(object sender, HelloEventArgs e);
public class HelloEventArgs : EventArgs
{
public string Name { get; set; }
public HelloEventArgs(string name)
{
Name = name;
}
public int SayHello()
{
System.Console.WriteLine(Name + " Hello.");
return DateTime.Now.Millisecond;
}
}
public static event HelloEventHandler HelloHandlerEvent;
public static void FirstLastRun()
{
var task = Task.Run(() =>
{
Thread.Sleep(500);
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom"));
HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry"));
});
var observable = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>(
handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler)
.Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default)
.Select(s =>
{
// 複雜的計算過程。
Thread.Sleep(100);
var result = s;
Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId);
return result;
})
.Take(3)//這個標識3個就結束了
;
var res =
Task.Run(async () => await observable
// //4個hello,3個result,res為最後一個的結果
//.FirstAsync()//4個hello,1個result,res為第一個的結果
//.LastAsync()//4個hello,3個result,res為最後一個的結果
//.ToList()//4個hello,3個result,res為3個的結果
).Result;
System.Console.WriteLine($"Res:{string.Join(',', res)},ResType:{res.GetType().Name}");
task.Wait();
}
輸出:
lilei Hello.
HanMeimei Hello.
Tom Hello.
Jerry Hello.
Now Millisecond result 534 on thread 7
Now Millisecond result 544 on thread 7
Now Millisecond result 544 on thread 7
Res:544,ResType:Int32
在 await
調用 Observable 對象或 LastAsync
時,代碼(非同步地)等待事件流完成,然後返 回最後一個元素。
- 在內部,await 實際是在訂閱事件流,完成後退訂
cs IObservable<int> observable = ...; int lastElement = await observable.LastAsync(); // 或者 int lastElement = await observable;
使用 FirstAsync
可捕獲事件流中, FirstAsync
方法執行後的下一個事件。
- 本例中
await
訂閱事件流,然後在第一個事件到達後立即結束(並退訂):
cs IObservable<int> observable = ...; int nextElement = await observable.FirstAsync();
使用 ToList
可捕獲事件流中的所有事件:
IObservable<int> observable = ...;
IList<int> allElements = await observable.ToList();
5. 用 Rx Observable 對象封裝 async 代碼
任何非同步操作都可看作一個滿足以下條件之一的可觀察流:
- 生成一個元素後就完成;
- 發生錯誤,不生成任何元素。
ToObservable
和 StartAsync
都會立即啟動非同步操作,而不會等待訂閱
- 但之後訂閱呢,或等待執行完再訂閱呢,能得到結果嗎
- 可以,後面例子中的“輸出”中有體現
如果要讓 observable 對象在接受訂閱後才啟動操作,可使用 FromAsync
- 跟
StartAsync
一樣,它也支持使用CancellationToken
取消
public static void AsyncObservableRun()
{
var client = new HttpClient();
IObservable<int> response1 = Task.Run(() => { System.Console.WriteLine("Run 1."); return 1; }).ToObservable();//直接執行
IObservable<int> response2 = Observable.StartAsync(token => Task.Run(() => { System.Console.WriteLine("Run 2."); return 2; }, token));//直接執行
IObservable<int> response3 = Observable.FromAsync(token => Task.Run(() => { System.Console.WriteLine("Run 3."); return 3; }, token));//訂閱後執行
var res = Task.Run(async () =>
await response1
//await response2
//await response3
).Result;
System.Console.WriteLine($"Res:{res}");
}
輸出(response1):
Run 1.
Run 2.
Res:1
輸出(response2):
Run 1.
Run 2.
Res:2
輸出(response1):
Run 1.
Run 2.
Run 3.
Res:3
ToObservable
和StartAsync
都返回一個 observable 對象,表示一個已經啟動的非同步操作FromAsync
在每次被訂閱時都會啟動一個全新獨立的非同步操作。
下麵的例子使用一個已有的 URL 事件流,在每個 URL 到達時發出一個請求:
public static void SelectManyRun()
{
IObservable<int> nums = new int[] { 1, 2, 3 }.ToObservable();
IObservable<int> observable = nums.SelectMany((n, token) => Task.Run<int>(() => { System.Console.WriteLine($"Run {n}."); return n + 1; }, token));
var res = Task.Run(async () => await observable.LastAsync()).Result;
System.Console.WriteLine($"Res:{res}");
}
輸出:
Run 1.
Run 2.
Run 3.
Res:3
6. Rx Observable 對象和數據流網格
同一個項目中
- 一部分使用了 Rx Observable 對象
- 一部分使用了數據流網格
現在需要它們能互相溝通。
網格轉可觀察流
public static void BlockToObservableRun()
{
var buffer = new BufferBlock<int>();
IObservable<int> integers = buffer.AsObservable();
integers.Subscribe(
data => Console.WriteLine(data),
ex => Console.WriteLine(ex),
() => Console.WriteLine("Done"));
buffer.Post(1);
buffer.Post(2);
buffer.Complete();
buffer.Completion.Wait();
}
輸出:
1
2
AsObservable
方法會把數據流塊的完成信息(或出錯信息)轉化為可觀察流的完成信息。
- 如果數據流塊出錯並拋出異常,這個異常信息在傳遞給可觀察流時,會被封裝在
AggregateException
對象中。
可觀察流轉網格
public static void ObservableToBlockRun()
{
IObservable<DateTimeOffset> ticks = Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Select(x => x.Timestamp)
.Take(5);
var display = new ActionBlock<DateTimeOffset>(x => Console.WriteLine(x));
ticks.Subscribe(display.AsObserver());
try
{
display.Completion.Wait();
Console.WriteLine("Done.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
輸出:
2020/2/1 上午1:42:24 +00:00
2020/2/1 上午1:42:25 +00:00
2020/2/1 上午1:42:26 +00:00
2020/2/1 上午1:42:27 +00:00
2020/2/1 上午1:42:28 +00:00
Done.
- 跟前面一樣,可觀察流的完成信息會轉化為塊的完成信息
- 可觀察流的錯誤信息會轉化為 塊的錯誤信息。