場景 生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步. 下麵用一個Asp.NetCore Web-API項目來模擬 創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串. 實現如下: 接著 ...
場景
生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步.
下麵用一個Asp.NetCore Web-API項目來模擬
創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串.
實現如下:
[Route("api/[controller]/[action]")] public class FooController : Control { IMessageQueue _mq; public FooController(IMessageQueue mq) { _mq = mq; } [HttpGet] public string Get() { string str = _mq.ReadOne<string>(); return str; } [HttpGet] public void Set(string v) { _mq.WriteOne(v); } } public interface IMessageQueue { T ReadOne<T>(); void WriteOne<T>(T value); } public class MessageQueue: IMessageQueue { private object _value; public T ReadOne<T>() { return (T)_value; } public void WriteOne<T>(T value) { _value = value; } }
接著在StartUp中把IMessageQueue給註入了.
services.AddSingleton<IMessageQueue, MessageQueue>();
運行後, 先調用/api/foo/set/?v=xxx, 再調用/api/foo/get/
可以看到成功返回了xxx
第二步, value欄位改為隊列:
使set進去的值不會被下一個覆蓋, get取隊列最前的值
為了線程安全, 這裡使用了ConcurrentQueue<T>
代碼如下:
public class MessageQueue: IMessageQueue { private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>(); public T ReadOne<T>() { _queue.TryDequeue(out object str); return (T)str ; } public void WriteOne<T>(Tvalue) { _queue.Enqueue(value); } }
那麼此時, 只要get不斷地輪詢, 就可以取到set生產出來的數據了.
調用/api/foo/set/
三, 非同步阻塞
再增加需求, 調換get和set的順序,先get後set模擬非同步, (我這裡的demo是個web-api會有http請求超時之類的...假裝不存在)我想要get調用等待有數據時才返回.
也就是說我想要在瀏覽器地址欄輸入http://localhost:5000/api/foo/get/之後會不斷地轉圈直到我用set介面放入一個值
方案A: while(true), 根本無情簡直無敵, 死等Read() != null時break; 為防單核滿轉加個Thread.Sleep();
方案B: Monitor, 一個Wait()一個Exit/Release();
但是以上兩個方案都是基於Thread的, .Net4.0之後伴隨ConcurrentQueue一起來的還有個BlockingCollection<T>相當好用
方案C: 修改後代碼如下:
public class MessageQueue : IMessageQueue { private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>()); public T ReadOne<T>() { var obj = _queue.Take(); return (T)obj; } public void WriteOne<T>(T value) { _queue.Add(value); } }
此時, 如果先get, 會阻塞等待set; 如果已經有set過數據就會直接返回隊列中的數據. get不會無功而返了. 基於這個類型, 可以實現更像樣的訂閱模型.
擴展RPC
這裡的set是生產者, get是消費者, 那如果我的這個生產者並不單純產生數據返回void而是需要等待一個結果的呢? 此時訂閱模型不夠用了, 我需要一個非同步的RPC .
比如有個Ask請求會攜帶參數發起請求, 並等待, 知道另外有個地方處理了這個任務產生結果, ask結束等待返回這個結果answer.
我可以回頭繼續用方案A或B, 但連.net4.0都已經過去很久了, 所以應該用更好的基於Task的非同步方案.
代碼如下, 首先新增兩個介面:
public interface IMessageQueue { void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func); Task<TResponse> Rpc<TRequest, TResponse>(TRequest req); T ReadOne<T>(); void WriteOne<T>(T data); }
接著定義一個特殊的任務類:
public class RpcTask<TRequest, TResponse> { public TaskCompletionSource<TResponse> Tcs { get; set; } public TRequest Request { get; set; } }
實現剛纔新加的兩個介面:
public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req) { TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>(); _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs}); return tcs.Task; } public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func) { var obj = _queue.Take(); if(obj is RpcTask<TRequest, TResponse> t) { var response = func(t.Request); t.Tcs.SetResult(response); } }
同樣的, 寫兩個Web API介面, 一個請求等待結果 一個負責處理工作
[HttpGet] public async Task<string> Ask(string v) { var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v }); return $"[{response.DoneTime}] {response.Id}"; } [HttpGet] public void Answer() { _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now }); }
上面還隨便寫了兩個class作為請求和返回
public class MyRequest { public string Id { get; set; } } public class MyResponse { public string Id { get; set; } public DateTime DoneTime { get; set; } }
測試一下, 用瀏覽器或postman打開三個選項卡, 各發起一個Ask介面的請求, 參數v分別為1 2 3, 三個選項卡都開始轉圈等待
然後再打開一個選項卡訪問answer介面, 處理剛纔放進隊列的任務, 發起一次之前的三個選項卡之中就有一個停止等待並顯示返回數據. 需求實現.
這裡用到的關鍵類型是TaskCompletionSource<T>.
再擴展
如果是個分散式系統, 請求和處理邏輯不是在一個程式里呢? 那麼這個隊列可能也是一個單獨的服務. 此時就要再加個返回隊列了, 給隊列中傳輸的每一個任務打上Id, 返回隊列中取出返回之後再找到Id對於的TCS.SetResult()