C#非同步案例一則

来源:https://www.cnblogs.com/pasoraku/archive/2019/12/02/11971944.html
-Advertisement-
Play Games

場景 生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步. 下麵用一個Asp.NetCore Web-API項目來模擬 創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串. 實現如下: 接著 ...


場景

  生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步.

下麵用一個Asp.NetCore Web-API項目來模擬

  創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串.

  實現如下:  

[Route("api/[controller]/[action]")]
public class FooController : Control
{
    IMessageQueue _mq;
    public FooController(IMessageQueue mq)
    {
        _mq = mq;
    }

    [HttpGet]
    public string Get()
    {
        string str = _mq.ReadOne<string>();
        return str;
    }

    [HttpGet]
    public void Set(string v)
    {
        _mq.WriteOne(v);
    }
}

public interface IMessageQueue
{
    T ReadOne<T>();
    void WriteOne<T>(T value);
}

public class MessageQueue: IMessageQueue
{
    private object _value;

    public T ReadOne<T>()
    {
        return (T)_value;
    }

    public void WriteOne<T>(T value)
    {
        _value = value;

    }
}

接著在StartUp中把IMessageQueue給註入了.

services.AddSingleton<IMessageQueue, MessageQueue>();

運行後, 先調用/api/foo/set/?v=xxx, 再調用/api/foo/get/

可以看到成功返回了xxx

第二步, value欄位改為隊列:

使set進去的值不會被下一個覆蓋, get取隊列最前的值

為了線程安全, 這裡使用了ConcurrentQueue<T>

代碼如下:

public class MessageQueue: IMessageQueue
{
    private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();

    public T ReadOne<T>()
    {
        _queue.TryDequeue(out object str);
        return (T)str ;
    }

    public void WriteOne<T>(Tvalue)
    {
        _queue.Enqueue(value);
    }
}

那麼此時, 只要get不斷地輪詢, 就可以取到set生產出來的數據了.

調用/api/foo/set/

三, 非同步阻塞

再增加需求, 調換get和set的順序,先get後set模擬非同步, (我這裡的demo是個web-api會有http請求超時之類的...假裝不存在)我想要get調用等待有數據時才返回.

也就是說我想要在瀏覽器地址欄輸入http://localhost:5000/api/foo/get/之後會不斷地轉圈直到我用set介面放入一個值

方案A: while(true), 根本無情簡直無敵, 死等Read() != null時break; 為防單核滿轉加個Thread.Sleep();

方案B: Monitor, 一個Wait()一個Exit/Release();

但是以上兩個方案都是基於Thread的, .Net4.0之後伴隨ConcurrentQueue一起來的還有個BlockingCollection<T>相當好用

方案C: 修改後代碼如下:

public class MessageQueue : IMessageQueue
{
    private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>());

    public T ReadOne<T>()
    {
        var obj = _queue.Take();
        return (T)obj;
    }

    public void WriteOne<T>(T value)
    {
        _queue.Add(value);
    }
}

此時, 如果先get, 會阻塞等待set; 如果已經有set過數據就會直接返回隊列中的數據. get不會無功而返了. 基於這個類型, 可以實現更像樣的訂閱模型.

擴展RPC

這裡的set是生產者, get是消費者, 那如果我的這個生產者並不單純產生數據返回void而是需要等待一個結果的呢? 此時訂閱模型不夠用了, 我需要一個非同步的RPC .

比如有個Ask請求會攜帶參數發起請求, 並等待, 知道另外有個地方處理了這個任務產生結果, ask結束等待返回這個結果answer. 

我可以回頭繼續用方案A或B, 但連.net4.0都已經過去很久了, 所以應該用更好的基於Task的非同步方案.

代碼如下, 首先新增兩個介面:

public interface IMessageQueue
{
    void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func);
    Task<TResponse> Rpc<TRequest, TResponse>(TRequest req);

    T ReadOne<T>();
    void WriteOne<T>(T data);
}

接著定義一個特殊的任務類:

public class RpcTask<TRequest, TResponse>
{
    public TaskCompletionSource<TResponse> Tcs { get; set; }
    public TRequest Request { get; set; }
}

實現剛纔新加的兩個介面:

public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req)
{
    TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>();
    _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs});
    return tcs.Task;
}

public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func)
{
    var obj = _queue.Take();
    if(obj is RpcTask<TRequest, TResponse> t)
    {
        var response = func(t.Request);
        t.Tcs.SetResult(response);
    }
}

同樣的, 寫兩個Web API介面, 一個請求等待結果 一個負責處理工作

[HttpGet]
public async Task<string> Ask(string v)
{
    var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v });
    return $"[{response.DoneTime}] {response.Id}";
}

[HttpGet]
public void Answer()
{
    _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now });
}

上面還隨便寫了兩個class作為請求和返回

public class MyRequest
{
    public string Id { get; set; }
}
public class MyResponse
{
    public string Id { get; set; }
    public DateTime DoneTime { get; set; }
}

測試一下, 用瀏覽器或postman打開三個選項卡, 各發起一個Ask介面的請求, 參數v分別為1 2 3, 三個選項卡都開始轉圈等待

然後再打開一個選項卡訪問answer介面, 處理剛纔放進隊列的任務, 發起一次之前的三個選項卡之中就有一個停止等待並顯示返回數據. 需求實現.

這裡用到的關鍵類型是TaskCompletionSource<T>.

再擴展

如果是個分散式系統, 請求和處理邏輯不是在一個程式里呢? 那麼這個隊列可能也是一個單獨的服務. 此時就要再加個返回隊列了, 給隊列中傳輸的每一個任務打上Id, 返回隊列中取出返回之後再找到Id對於的TCS.SetResult()

 

 

 

 

  


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 用於註解說明解釋程式的文字就是註釋。 提高代碼的閱讀性;調試程式的重要方法。 java中的註釋類型: 單行註釋 // 多行註釋 /* */ 文檔註釋 /** *文檔註釋 *輸出hello world *@author fang *@version 1.0.0 */ 註釋是一個程式員必須具備的良好編程 ...
  • [TOC] 1. 對Django的認識? 2. Django 、Flask、Tornado的對比 3. 什麼是wsgi,uwsgi,uWSGI? 4. django請求的生命周期? 5. 簡述什麼是FBV和CBV? 6. 如何給CBV的程式添加裝飾器? 7. 簡述MVC和MTV 8. django路 ...
  • 1、步驟 將java代碼編寫到擴展名為.java的文件中(擴展名的查看) 新建文本文檔,重命名為Test.java 以記事本方式打開 寫入代碼 public class Test{ public static void main(String[] args){ System.out.print("H ...
  • 直接看網址吧,所有的GO GUI代碼!~~~~ "網址" ...
  • 1、下載JDK 下載地址 https://www.oracle.com/technetwork/java/javase/downloads/index.html 2、安裝JDK 傻瓜安裝 3、配置環境 新建JAVA_HOME環境變數 新建classpath環境變數 新增Path變數 4、查看是否安裝 ...
  • 1.報錯,too many open files 查詢方法:查看linux允許的最大句柄數,命令ulimit -a。然後使用命令lsof -p 進程id可以查看單個進程所有打開的文件詳情,使用命令lsof -p 進程id | wc -l可以統計進程打開了多少文件,如果文件數過多使用lsof -p 進 ...
  • Serverless 技術為開發人員提供了一種快速而獨立的方式將實現投入生產。這種技術在企業的技術棧中日益流行,自 2017 年以來,它一直是 ThoughtWorks 技術雷達的實驗級別的技術[譯註:技術雷達是 ThoughtWorks 每半年發佈的前沿技術解析]。 本篇文章的第一部分介紹了... ...
  • view: <form method="post" enctype="multipart/form-data" action="@Url.Action("Upload")"> <input type="file" id="file" name="file"/> <button>提交</button> ...
一周排行
    -Advertisement-
    Play Games
  • 1. 說明 /* Performs operations on System.String instances that contain file or directory path information. These operations are performed in a cross-pla ...
  • 視頻地址:【WebApi+Vue3從0到1搭建《許可權管理系統》系列視頻:搭建JWT系統鑒權-嗶哩嗶哩】 https://b23.tv/R6cOcDO qq群:801913255 一、在appsettings.json中設置鑒權屬性 /*jwt鑒權*/ "JwtSetting": { "Issuer" ...
  • 引言 集成測試可在包含應用支持基礎結構(如資料庫、文件系統和網路)的級別上確保應用組件功能正常。 ASP.NET Core 通過將單元測試框架與測試 Web 主機和記憶體中測試伺服器結合使用來支持集成測試。 簡介 集成測試與單元測試相比,能夠在更廣泛的級別上評估應用的組件,確認多個組件一起工作以生成預 ...
  • 在.NET Emit編程中,我們探討了運算操作指令的重要性和應用。這些指令包括各種數學運算、位操作和比較操作,能夠在動態生成的代碼中實現對數據的處理和操作。通過這些指令,開發人員可以靈活地進行算術運算、邏輯運算和比較操作,從而實現各種複雜的演算法和邏輯......本篇之後,將進入第七部分:實戰項目 ...
  • 前言 多表頭表格是一個常見的業務需求,然而WPF中卻沒有預設實現這個功能,得益於WPF強大的控制項模板設計,我們可以通過修改控制項模板的方式自己實現它。 一、需求分析 下圖為一個典型的統計表格,統計1-12月的數據。 此時我們有一個需求,需要將月份按季度劃分,以便能夠直觀地看到季度統計數據,以下為該需求 ...
  • 如何將 ASP.NET Core MVC 項目的視圖分離到另一個項目 在當下這個年代 SPA 已是主流,人們早已忘記了 MVC 以及 Razor 的故事。但是在某些場景下 SSR 還是有意想不到效果。比如某些靜態頁面,比如追求首屏載入速度的時候。最近在項目中回歸傳統效果還是不錯。 有的時候我們希望將 ...
  • System.AggregateException: 發生一個或多個錯誤。 > Microsoft.WebTools.Shared.Exceptions.WebToolsException: 生成失敗。檢查輸出視窗瞭解更多詳細信息。 內部異常堆棧跟蹤的結尾 > (內部異常 #0) Microsoft ...
  • 引言 在上一章節我們實戰了在Asp.Net Core中的項目實戰,這一章節講解一下如何測試Asp.Net Core的中間件。 TestServer 還記得我們在集成測試中提供的TestServer嗎? TestServer 是由 Microsoft.AspNetCore.TestHost 包提供的。 ...
  • 在發現結果為真的WHEN子句時,CASE表達式的真假值判斷會終止,剩餘的WHEN子句會被忽略: CASE WHEN col_1 IN ('a', 'b') THEN '第一' WHEN col_1 IN ('a') THEN '第二' ELSE '其他' END 註意: 統一各分支返回的數據類型. ...
  • 在C#編程世界中,語法的精妙之處往往體現在那些看似微小卻極具影響力的符號與結構之中。其中,“_ =” 這一組合突然出現還真不知道什麼意思。本文將深入剖析“_ =” 的含義、工作原理及其在實際編程中的廣泛應用,揭示其作為C#語法奇兵的重要角色。 一、下劃線 _:神秘的棄元符號 下劃線 _ 在C#中並非 ...