C#非同步案例一則

来源:https://www.cnblogs.com/pasoraku/archive/2019/12/02/11971944.html

場景 生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步. 下麵用一個Asp.NetCore Web-API項目來模擬 創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串. 實現如下: 接著 ...


場景

  生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步.

下麵用一個Asp.NetCore Web-API項目來模擬

  創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串.

  實現如下:  

[Route("api/[controller]/[action]")]
public class FooController : Control
{
    IMessageQueue _mq;
    public FooController(IMessageQueue mq)
    {
        _mq = mq;
    }

    [HttpGet]
    public string Get()
    {
        string str = _mq.ReadOne<string>();
        return str;
    }

    [HttpGet]
    public void Set(string v)
    {
        _mq.WriteOne(v);
    }
}

public interface IMessageQueue
{
    T ReadOne<T>();
    void WriteOne<T>(T value);
}

public class MessageQueue: IMessageQueue
{
    private object _value;

    public T ReadOne<T>()
    {
        return (T)_value;
    }

    public void WriteOne<T>(T value)
    {
        _value = value;

    }
}

接著在StartUp中把IMessageQueue給註入了.

services.AddSingleton<IMessageQueue, MessageQueue>();

運行後, 先調用/api/foo/set/?v=xxx, 再調用/api/foo/get/

可以看到成功返回了xxx

第二步, value欄位改為隊列:

使set進去的值不會被下一個覆蓋, get取隊列最前的值

為了線程安全, 這裡使用了ConcurrentQueue<T>

代碼如下:

public class MessageQueue: IMessageQueue
{
    private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();

    public T ReadOne<T>()
    {
        _queue.TryDequeue(out object str);
        return (T)str ;
    }

    public void WriteOne<T>(Tvalue)
    {
        _queue.Enqueue(value);
    }
}

那麼此時, 只要get不斷地輪詢, 就可以取到set生產出來的數據了.

調用/api/foo/set/

三, 非同步阻塞

再增加需求, 調換get和set的順序,先get後set模擬非同步, (我這裡的demo是個web-api會有http請求超時之類的...假裝不存在)我想要get調用等待有數據時才返回.

也就是說我想要在瀏覽器地址欄輸入http://localhost:5000/api/foo/get/之後會不斷地轉圈直到我用set介面放入一個值

方案A: while(true), 根本無情簡直無敵, 死等Read() != null時break; 為防單核滿轉加個Thread.Sleep();

方案B: Monitor, 一個Wait()一個Exit/Release();

但是以上兩個方案都是基於Thread的, .Net4.0之後伴隨ConcurrentQueue一起來的還有個BlockingCollection<T>相當好用

方案C: 修改後代碼如下:

public class MessageQueue : IMessageQueue
{
    private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>());

    public T ReadOne<T>()
    {
        var obj = _queue.Take();
        return (T)obj;
    }

    public void WriteOne<T>(T value)
    {
        _queue.Add(value);
    }
}

此時, 如果先get, 會阻塞等待set; 如果已經有set過數據就會直接返回隊列中的數據. get不會無功而返了. 基於這個類型, 可以實現更像樣的訂閱模型.

擴展RPC

這裡的set是生產者, get是消費者, 那如果我的這個生產者並不單純產生數據返回void而是需要等待一個結果的呢? 此時訂閱模型不夠用了, 我需要一個非同步的RPC .

比如有個Ask請求會攜帶參數發起請求, 並等待, 知道另外有個地方處理了這個任務產生結果, ask結束等待返回這個結果answer. 

我可以回頭繼續用方案A或B, 但連.net4.0都已經過去很久了, 所以應該用更好的基於Task的非同步方案.

代碼如下, 首先新增兩個介面:

public interface IMessageQueue
{
    void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func);
    Task<TResponse> Rpc<TRequest, TResponse>(TRequest req);

    T ReadOne<T>();
    void WriteOne<T>(T data);
}

接著定義一個特殊的任務類:

public class RpcTask<TRequest, TResponse>
{
    public TaskCompletionSource<TResponse> Tcs { get; set; }
    public TRequest Request { get; set; }
}

實現剛纔新加的兩個介面:

public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req)
{
    TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>();
    _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs});
    return tcs.Task;
}

public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func)
{
    var obj = _queue.Take();
    if(obj is RpcTask<TRequest, TResponse> t)
    {
        var response = func(t.Request);
        t.Tcs.SetResult(response);
    }
}

同樣的, 寫兩個Web API介面, 一個請求等待結果 一個負責處理工作

[HttpGet]
public async Task<string> Ask(string v)
{
    var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v });
    return $"[{response.DoneTime}] {response.Id}";
}

[HttpGet]
public void Answer()
{
    _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now });
}

上面還隨便寫了兩個class作為請求和返回

public class MyRequest
{
    public string Id { get; set; }
}
public class MyResponse
{
    public string Id { get; set; }
    public DateTime DoneTime { get; set; }
}

測試一下, 用瀏覽器或postman打開三個選項卡, 各發起一個Ask介面的請求, 參數v分別為1 2 3, 三個選項卡都開始轉圈等待

然後再打開一個選項卡訪問answer介面, 處理剛纔放進隊列的任務, 發起一次之前的三個選項卡之中就有一個停止等待並顯示返回數據. 需求實現.

這裡用到的關鍵類型是TaskCompletionSource<T>.

再擴展

如果是個分散式系統, 請求和處理邏輯不是在一個程式里呢? 那麼這個隊列可能也是一個單獨的服務. 此時就要再加個返回隊列了, 給隊列中傳輸的每一個任務打上Id, 返回隊列中取出返回之後再找到Id對於的TCS.SetResult()

 

 

 

 

  


您的分享是我們最大的動力!

更多相關文章
  • 用於註解說明解釋程式的文字就是註釋。 提高代碼的閱讀性;調試程式的重要方法。 java中的註釋類型: 單行註釋 // 多行註釋 /* */ 文檔註釋 /** *文檔註釋 *輸出hello world *@author fang *@version 1.0.0 */ 註釋是一個程式員必須具備的良好編程 ...
  • [TOC] 1. 對Django的認識? 2. Django 、Flask、Tornado的對比 3. 什麼是wsgi,uwsgi,uWSGI? 4. django請求的生命周期? 5. 簡述什麼是FBV和CBV? 6. 如何給CBV的程式添加裝飾器? 7. 簡述MVC和MTV 8. django路 ...
  • 1、步驟 將java代碼編寫到擴展名為.java的文件中(擴展名的查看) 新建文本文檔,重命名為Test.java 以記事本方式打開 寫入代碼 public class Test{ public static void main(String[] args){ System.out.print("H ...
  • 直接看網址吧,所有的GO GUI代碼!~~~~ "網址" ...
  • 1、下載JDK 下載地址 https://www.oracle.com/technetwork/java/javase/downloads/index.html 2、安裝JDK 傻瓜安裝 3、配置環境 新建JAVA_HOME環境變數 新建classpath環境變數 新增Path變數 4、查看是否安裝 ...
  • 1.報錯,too many open files 查詢方法:查看linux允許的最大句柄數,命令ulimit -a。然後使用命令lsof -p 進程id可以查看單個進程所有打開的文件詳情,使用命令lsof -p 進程id | wc -l可以統計進程打開了多少文件,如果文件數過多使用lsof -p 進 ...
  • Serverless 技術為開發人員提供了一種快速而獨立的方式將實現投入生產。這種技術在企業的技術棧中日益流行,自 2017 年以來,它一直是 ThoughtWorks 技術雷達的實驗級別的技術[譯註:技術雷達是 ThoughtWorks 每半年發佈的前沿技術解析]。 本篇文章的第一部分介紹了... ...
  • view: <form method="post" enctype="multipart/form-data" action="@Url.Action("Upload")"> <input type="file" id="file" name="file"/> <button>提交</button> ...
一周排行
  • 背景 當我們對ASP.Net Core內部的某些方法、類的實現感興趣時,有很多方法可以去瞭解,看書,看各種文章,但是最直接也是最深入的辦法就是去閱讀源代碼。ASP.NET Core的源代碼托管在Github,項目地址是:https://github.com/dotnet/aspnetcore。如果只 ...
  • 1. 前言 Xceed wpftoolkit提供了一個 "CheckListBox" ,效果如下: 不過它用起來不怎麼樣,與其這樣還不如參考UWP的ListView實現,而且動畫效果也很好看: 它的樣式如下: 屬性是很多了,但這裡沒有自定義CheckBox樣式的方法,而且也沒法參考它的動畫如何實現。 ...
  • 定義委托 委托實例化 定義具體執行的方法 綁定方法 觸發委托 ...
  • 文章出處: https://www.cnblogs.com/dotnet261010/p/6275821.html 一、WPF簡介 WPF:WPF即Windows Presentation Foundation,翻譯為中文“Windows呈現基礎”,是微軟推出的基於Windows Vista的用戶界 ...
  • 前言 IdentityServer4(以下簡稱 Id4) 是 Asp.Net Core 中一個非常流行的 OpenId Connect 和 OAuth 2.0 框架,可以輕鬆集成到 Asp.Net Core 應用中,並且與 Asp.Net Core Identity 也可以輕鬆集成。博客園也有大佬發 ...
  • //從第1個開始,依次向左插入值。如果鍵不存在,先創建再插入值 隊列形式 先進後出,後進先出 //插入後形式 <-- 10,9,8,7,6,5,4,3,2,1 <-- 方向向左依次進行 stopwatch.Start(); for (int i = 0; i < 10; i++) { var get ...
  • 過濾漢字 Regex.Replace(inputStr,@"[\u4e00-\u9fa5]",string.Empty); 提取漢字: Regex.Replace(inputStr,@"[^\u4e00-\u9fa5]",string.Empty);//註意這裡多了個^符號 ...
  • String類型很簡單,就不做示例演示了,這裡只貼出Helper類 /// <summary> /// 判斷key是否存在 /// </summary> /// <param name="key"></param> /// <param name="db"></param> /// <returns ...
  • 通過上一章的學習,Geometry抽象類表示形狀或路徑。Drawing抽象類扮演了互補的角色,它表示2D圖畫(Drawing)——換句話說,它包含了顯示矢量圖像或點陣圖需要的所有信息。 儘管有幾類畫圖類,但只有GeometryDrawing類能使用已經學習過的幾何圖形。它增加了決定如何繪製圖形的畫筆和 ...
  • 以控台的形式,運行.net core mvc 代碼, Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>();//指定網路主機要使用的啟動類型 ...
x