C#非同步案例一則

来源:https://www.cnblogs.com/pasoraku/archive/2019/12/02/11971944.html

場景 生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步. 下麵用一個Asp.NetCore Web-API項目來模擬 創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串. 實現如下: 接著 ...


場景

  生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步.

下麵用一個Asp.NetCore Web-API項目來模擬

  創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串.

  實現如下:  

[Route("api/[controller]/[action]")]
public class FooController : Control
{
    IMessageQueue _mq;
    public FooController(IMessageQueue mq)
    {
        _mq = mq;
    }

    [HttpGet]
    public string Get()
    {
        string str = _mq.ReadOne<string>();
        return str;
    }

    [HttpGet]
    public void Set(string v)
    {
        _mq.WriteOne(v);
    }
}

public interface IMessageQueue
{
    T ReadOne<T>();
    void WriteOne<T>(T value);
}

public class MessageQueue: IMessageQueue
{
    private object _value;

    public T ReadOne<T>()
    {
        return (T)_value;
    }

    public void WriteOne<T>(T value)
    {
        _value = value;

    }
}

接著在StartUp中把IMessageQueue給註入了.

services.AddSingleton<IMessageQueue, MessageQueue>();

運行後, 先調用/api/foo/set/?v=xxx, 再調用/api/foo/get/

可以看到成功返回了xxx

第二步, value欄位改為隊列:

使set進去的值不會被下一個覆蓋, get取隊列最前的值

為了線程安全, 這裡使用了ConcurrentQueue<T>

代碼如下:

public class MessageQueue: IMessageQueue
{
    private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();

    public T ReadOne<T>()
    {
        _queue.TryDequeue(out object str);
        return (T)str ;
    }

    public void WriteOne<T>(Tvalue)
    {
        _queue.Enqueue(value);
    }
}

那麼此時, 只要get不斷地輪詢, 就可以取到set生產出來的數據了.

調用/api/foo/set/

三, 非同步阻塞

再增加需求, 調換get和set的順序,先get後set模擬非同步, (我這裡的demo是個web-api會有http請求超時之類的...假裝不存在)我想要get調用等待有數據時才返回.

也就是說我想要在瀏覽器地址欄輸入http://localhost:5000/api/foo/get/之後會不斷地轉圈直到我用set介面放入一個值

方案A: while(true), 根本無情簡直無敵, 死等Read() != null時break; 為防單核滿轉加個Thread.Sleep();

方案B: Monitor, 一個Wait()一個Exit/Release();

但是以上兩個方案都是基於Thread的, .Net4.0之後伴隨ConcurrentQueue一起來的還有個BlockingCollection<T>相當好用

方案C: 修改後代碼如下:

public class MessageQueue : IMessageQueue
{
    private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>());

    public T ReadOne<T>()
    {
        var obj = _queue.Take();
        return (T)obj;
    }

    public void WriteOne<T>(T value)
    {
        _queue.Add(value);
    }
}

此時, 如果先get, 會阻塞等待set; 如果已經有set過數據就會直接返回隊列中的數據. get不會無功而返了. 基於這個類型, 可以實現更像樣的訂閱模型.

擴展RPC

這裡的set是生產者, get是消費者, 那如果我的這個生產者並不單純產生數據返回void而是需要等待一個結果的呢? 此時訂閱模型不夠用了, 我需要一個非同步的RPC .

比如有個Ask請求會攜帶參數發起請求, 並等待, 知道另外有個地方處理了這個任務產生結果, ask結束等待返回這個結果answer. 

我可以回頭繼續用方案A或B, 但連.net4.0都已經過去很久了, 所以應該用更好的基於Task的非同步方案.

代碼如下, 首先新增兩個介面:

public interface IMessageQueue
{
    void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func);
    Task<TResponse> Rpc<TRequest, TResponse>(TRequest req);

    T ReadOne<T>();
    void WriteOne<T>(T data);
}

接著定義一個特殊的任務類:

public class RpcTask<TRequest, TResponse>
{
    public TaskCompletionSource<TResponse> Tcs { get; set; }
    public TRequest Request { get; set; }
}

實現剛纔新加的兩個介面:

public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req)
{
    TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>();
    _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs});
    return tcs.Task;
}

public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func)
{
    var obj = _queue.Take();
    if(obj is RpcTask<TRequest, TResponse> t)
    {
        var response = func(t.Request);
        t.Tcs.SetResult(response);
    }
}

同樣的, 寫兩個Web API介面, 一個請求等待結果 一個負責處理工作

[HttpGet]
public async Task<string> Ask(string v)
{
    var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v });
    return $"[{response.DoneTime}] {response.Id}";
}

[HttpGet]
public void Answer()
{
    _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now });
}

上面還隨便寫了兩個class作為請求和返回

public class MyRequest
{
    public string Id { get; set; }
}
public class MyResponse
{
    public string Id { get; set; }
    public DateTime DoneTime { get; set; }
}

測試一下, 用瀏覽器或postman打開三個選項卡, 各發起一個Ask介面的請求, 參數v分別為1 2 3, 三個選項卡都開始轉圈等待

然後再打開一個選項卡訪問answer介面, 處理剛纔放進隊列的任務, 發起一次之前的三個選項卡之中就有一個停止等待並顯示返回數據. 需求實現.

這裡用到的關鍵類型是TaskCompletionSource<T>.

再擴展

如果是個分散式系統, 請求和處理邏輯不是在一個程式里呢? 那麼這個隊列可能也是一個單獨的服務. 此時就要再加個返回隊列了, 給隊列中傳輸的每一個任務打上Id, 返回隊列中取出返回之後再找到Id對於的TCS.SetResult()

 

 

 

 

  


您的分享是我們最大的動力!

更多相關文章
  • 用於註解說明解釋程式的文字就是註釋。 提高代碼的閱讀性;調試程式的重要方法。 java中的註釋類型: 單行註釋 // 多行註釋 /* */ 文檔註釋 /** *文檔註釋 *輸出hello world *@author fang *@version 1.0.0 */ 註釋是一個程式員必須具備的良好編程 ...
  • [TOC] 1. 對Django的認識? 2. Django 、Flask、Tornado的對比 3. 什麼是wsgi,uwsgi,uWSGI? 4. django請求的生命周期? 5. 簡述什麼是FBV和CBV? 6. 如何給CBV的程式添加裝飾器? 7. 簡述MVC和MTV 8. django路 ...
  • 1、步驟 將java代碼編寫到擴展名為.java的文件中(擴展名的查看) 新建文本文檔,重命名為Test.java 以記事本方式打開 寫入代碼 public class Test{ public static void main(String[] args){ System.out.print("H ...
  • 直接看網址吧,所有的GO GUI代碼!~~~~ "網址" ...
  • 1、下載JDK 下載地址 https://www.oracle.com/technetwork/java/javase/downloads/index.html 2、安裝JDK 傻瓜安裝 3、配置環境 新建JAVA_HOME環境變數 新建classpath環境變數 新增Path變數 4、查看是否安裝 ...
  • 1.報錯,too many open files 查詢方法:查看linux允許的最大句柄數,命令ulimit -a。然後使用命令lsof -p 進程id可以查看單個進程所有打開的文件詳情,使用命令lsof -p 進程id | wc -l可以統計進程打開了多少文件,如果文件數過多使用lsof -p 進 ...
  • Serverless 技術為開發人員提供了一種快速而獨立的方式將實現投入生產。這種技術在企業的技術棧中日益流行,自 2017 年以來,它一直是 ThoughtWorks 技術雷達的實驗級別的技術[譯註:技術雷達是 ThoughtWorks 每半年發佈的前沿技術解析]。 本篇文章的第一部分介紹了... ...
  • view: <form method="post" enctype="multipart/form-data" action="@Url.Action("Upload")"> <input type="file" id="file" name="file"/> <button>提交</button> ...
一周排行
  • 前言 上一篇文章主要介紹了ObjectPool的理論知識,再來介紹一下Microsoft.Extensions.ObjectPool是如何實現的. 核心組件 ObjectPool ObjectPool 是一個泛型抽象介面,他抽象了兩個方法Get和Return Get方法用於從對象池獲取到可用對象,如 ...
  • 國內優秀的WPF開源控制項庫,Panuon.UI的優化版本。一個漂亮的、使用樣式與附加屬性的WPF UI控制項庫,值得向大家推薦使用與學習。 今天站長(Dotnet9,站長網址:https://dotnet9.com, 微信公眾號:dotnet9_com)推薦另一款開源的WPF控制項庫(PanuonUI. ...
  • WGS-84坐標系:全球定位系統使用,GPS、北斗等 GCJ-02坐標系:中國地區使用,由WGS-84偏移而來 BD-09坐標系:百度專用,由GCJ-02偏移而來 (PS:源於項目需求,本來是想讀圖片的經緯度顯示在百度離線地圖上的。後來發現定位偏差太大,仔細一想,原來是圖片和百度使用的坐標系不一樣。 ...
  • .NET Core3.1發佈 我們很高興宣佈.NET Core 3.1的發佈。實際上,這隻是對我們兩個多月前發佈的.NET Core 3.0的一小部分修複和完善。最重要的是.NET Core 3.1是長期支持(LTS)版本,並且將支持三年。和過去一樣,我們希望花一些時間來發佈下一個LTS版本。額外的 ...
  • based on https://stackoverflow.com/questions/659013/accessing-a-shared-file-unc-from-a-remote-non-trusted-domain-with-credentials ...
  • private static void PathCopyFilesWithOriginalFolder() { int sourceFilesNum = 0; try { string sourceDir = @"E:\Source"; string destDir = @"E:\Dest"; st... ...
  • 前言 上一次資料庫災備和性能優化後,資料庫專家建議,在不擴容的情況下,客戶端不能再頻繁的掃描資料庫了!一句驚醒夢中人,因為我也發現資料庫越來越卡了,自從上個項目上線後,就出現了這個情況。後來分析其原因,發現客戶端每3秒中掃描一次資料庫,一共5000+客戶端,可想而知,頻繁掃描嚴重影響到資料庫性能。所 ...
  • 2019.12.4今天開通博客,跌跌撞撞學了3年C#,感覺有了基礎但還不夠深入,有些東西學了又忘,特此開通博客做一個記錄,記錄下以後學習中的每一個知識點,再接再厲,每天進步一點點!!!!!! ...
  • 本人剛接觸.net core 由於公司項目需要部署在Linux上 近些日子學習和網上大面積搜教程 我在這給大家歸攏歸攏借鑒的教程做了套方案(我寫的可以實現 但不一定是最好的 僅供參考) 我只用過core3.0 之前的版本沒接觸過 首先需要使用Nginx反代理的項目那一定是web框架的ASP.NET ...
  • WinFrm應用程式調用WebService服務 關於WebService的創建、發佈與部署等相關操作不再贅述,傳送門如下:C# VS2019 WebService創建與發佈,並部署到Windows Server 2012R 此篇記錄一下客戶端的調用,以便後續學習使用,不足之處請指出。 建立WinF ...
x