C#非同步案例一則

来源:https://www.cnblogs.com/pasoraku/archive/2019/12/02/11971944.html

場景 生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步. 下麵用一個Asp.NetCore Web-API項目來模擬 創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串. 實現如下: 接著 ...


場景

  生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要非同步.

下麵用一個Asp.NetCore Web-API項目來模擬

  創建兩個API, 一個Get(), 一個Set(), Get返回一個字元串, Set放入一個字元串, Get返回的就是Set進去的字元串.

  實現如下:  

[Route("api/[controller]/[action]")]
public class FooController : Control
{
    IMessageQueue _mq;
    public FooController(IMessageQueue mq)
    {
        _mq = mq;
    }

    [HttpGet]
    public string Get()
    {
        string str = _mq.ReadOne<string>();
        return str;
    }

    [HttpGet]
    public void Set(string v)
    {
        _mq.WriteOne(v);
    }
}

public interface IMessageQueue
{
    T ReadOne<T>();
    void WriteOne<T>(T value);
}

public class MessageQueue: IMessageQueue
{
    private object _value;

    public T ReadOne<T>()
    {
        return (T)_value;
    }

    public void WriteOne<T>(T value)
    {
        _value = value;

    }
}

接著在StartUp中把IMessageQueue給註入了.

services.AddSingleton<IMessageQueue, MessageQueue>();

運行後, 先調用/api/foo/set/?v=xxx, 再調用/api/foo/get/

可以看到成功返回了xxx

第二步, value欄位改為隊列:

使set進去的值不會被下一個覆蓋, get取隊列最前的值

為了線程安全, 這裡使用了ConcurrentQueue<T>

代碼如下:

public class MessageQueue: IMessageQueue
{
    private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();

    public T ReadOne<T>()
    {
        _queue.TryDequeue(out object str);
        return (T)str ;
    }

    public void WriteOne<T>(Tvalue)
    {
        _queue.Enqueue(value);
    }
}

那麼此時, 只要get不斷地輪詢, 就可以取到set生產出來的數據了.

調用/api/foo/set/

三, 非同步阻塞

再增加需求, 調換get和set的順序,先get後set模擬非同步, (我這裡的demo是個web-api會有http請求超時之類的...假裝不存在)我想要get調用等待有數據時才返回.

也就是說我想要在瀏覽器地址欄輸入http://localhost:5000/api/foo/get/之後會不斷地轉圈直到我用set介面放入一個值

方案A: while(true), 根本無情簡直無敵, 死等Read() != null時break; 為防單核滿轉加個Thread.Sleep();

方案B: Monitor, 一個Wait()一個Exit/Release();

但是以上兩個方案都是基於Thread的, .Net4.0之後伴隨ConcurrentQueue一起來的還有個BlockingCollection<T>相當好用

方案C: 修改後代碼如下:

public class MessageQueue : IMessageQueue
{
    private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>());

    public T ReadOne<T>()
    {
        var obj = _queue.Take();
        return (T)obj;
    }

    public void WriteOne<T>(T value)
    {
        _queue.Add(value);
    }
}

此時, 如果先get, 會阻塞等待set; 如果已經有set過數據就會直接返回隊列中的數據. get不會無功而返了. 基於這個類型, 可以實現更像樣的訂閱模型.

擴展RPC

這裡的set是生產者, get是消費者, 那如果我的這個生產者並不單純產生數據返回void而是需要等待一個結果的呢? 此時訂閱模型不夠用了, 我需要一個非同步的RPC .

比如有個Ask請求會攜帶參數發起請求, 並等待, 知道另外有個地方處理了這個任務產生結果, ask結束等待返回這個結果answer. 

我可以回頭繼續用方案A或B, 但連.net4.0都已經過去很久了, 所以應該用更好的基於Task的非同步方案.

代碼如下, 首先新增兩個介面:

public interface IMessageQueue
{
    void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func);
    Task<TResponse> Rpc<TRequest, TResponse>(TRequest req);

    T ReadOne<T>();
    void WriteOne<T>(T data);
}

接著定義一個特殊的任務類:

public class RpcTask<TRequest, TResponse>
{
    public TaskCompletionSource<TResponse> Tcs { get; set; }
    public TRequest Request { get; set; }
}

實現剛纔新加的兩個介面:

public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req)
{
    TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>();
    _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs});
    return tcs.Task;
}

public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func)
{
    var obj = _queue.Take();
    if(obj is RpcTask<TRequest, TResponse> t)
    {
        var response = func(t.Request);
        t.Tcs.SetResult(response);
    }
}

同樣的, 寫兩個Web API介面, 一個請求等待結果 一個負責處理工作

[HttpGet]
public async Task<string> Ask(string v)
{
    var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v });
    return $"[{response.DoneTime}] {response.Id}";
}

[HttpGet]
public void Answer()
{
    _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now });
}

上面還隨便寫了兩個class作為請求和返回

public class MyRequest
{
    public string Id { get; set; }
}
public class MyResponse
{
    public string Id { get; set; }
    public DateTime DoneTime { get; set; }
}

測試一下, 用瀏覽器或postman打開三個選項卡, 各發起一個Ask介面的請求, 參數v分別為1 2 3, 三個選項卡都開始轉圈等待

然後再打開一個選項卡訪問answer介面, 處理剛纔放進隊列的任務, 發起一次之前的三個選項卡之中就有一個停止等待並顯示返回數據. 需求實現.

這裡用到的關鍵類型是TaskCompletionSource<T>.

再擴展

如果是個分散式系統, 請求和處理邏輯不是在一個程式里呢? 那麼這個隊列可能也是一個單獨的服務. 此時就要再加個返回隊列了, 給隊列中傳輸的每一個任務打上Id, 返回隊列中取出返回之後再找到Id對於的TCS.SetResult()

 

 

 

 

  


您的分享是我們最大的動力!

更多相關文章
  • 用於註解說明解釋程式的文字就是註釋。 提高代碼的閱讀性;調試程式的重要方法。 java中的註釋類型: 單行註釋 // 多行註釋 /* */ 文檔註釋 /** *文檔註釋 *輸出hello world *@author fang *@version 1.0.0 */ 註釋是一個程式員必須具備的良好編程 ...
  • [TOC] 1. 對Django的認識? 2. Django 、Flask、Tornado的對比 3. 什麼是wsgi,uwsgi,uWSGI? 4. django請求的生命周期? 5. 簡述什麼是FBV和CBV? 6. 如何給CBV的程式添加裝飾器? 7. 簡述MVC和MTV 8. django路 ...
  • 1、步驟 將java代碼編寫到擴展名為.java的文件中(擴展名的查看) 新建文本文檔,重命名為Test.java 以記事本方式打開 寫入代碼 public class Test{ public static void main(String[] args){ System.out.print("H ...
  • 直接看網址吧,所有的GO GUI代碼!~~~~ "網址" ...
  • 1、下載JDK 下載地址 https://www.oracle.com/technetwork/java/javase/downloads/index.html 2、安裝JDK 傻瓜安裝 3、配置環境 新建JAVA_HOME環境變數 新建classpath環境變數 新增Path變數 4、查看是否安裝 ...
  • 1.報錯,too many open files 查詢方法:查看linux允許的最大句柄數,命令ulimit -a。然後使用命令lsof -p 進程id可以查看單個進程所有打開的文件詳情,使用命令lsof -p 進程id | wc -l可以統計進程打開了多少文件,如果文件數過多使用lsof -p 進 ...
  • Serverless 技術為開發人員提供了一種快速而獨立的方式將實現投入生產。這種技術在企業的技術棧中日益流行,自 2017 年以來,它一直是 ThoughtWorks 技術雷達的實驗級別的技術[譯註:技術雷達是 ThoughtWorks 每半年發佈的前沿技術解析]。 本篇文章的第一部分介紹了... ...
  • view: <form method="post" enctype="multipart/form-data" action="@Url.Action("Upload")"> <input type="file" id="file" name="file"/> <button>提交</button> ...
一周排行
  • 比如要拆分“呵呵呵90909086676喝喝999”,下麵當type=0返回的是中文字元串“呵呵呵,喝喝”,type=1返回的是數字字元串“90909086676,999”, private string GetStrings(string str,int type=0) { IList<strin ...
  • Swagger一個優秀的Api介面文檔生成工具。Swagger可以可以動態生成Api介面文檔,有效的降低前後端人員關於Api介面的溝通成本,促進項目高效開發。 1、使用NuGet安裝最新的包:Swashbuckle.AspNetCore。 2、編輯項目文件(NetCoreTemplate.Web.c ...
  • 2020 年 7 月 30 日, 由.NET基金會和微軟 將舉辦一個線上和為期一天的活動,包括 微軟 .NET 團隊的演講者以及社區的演講者。本次線上大會 專註.NET框架構建微服務,演講者分享構建和部署雲原生應用程式的最佳實踐、模式、提示和技巧。有關更多信息和隨時瞭解情況:https://focu... ...
  • #abp框架Excel導出——基於vue #1.技術棧 ##1.1 前端採用vue,官方提供 UI套件用的是iview ##1.2 後臺是abp——aspnetboilerplate 即abp v1,https://github.com/aspnetboilerplate/aspnetboilerp ...
  • 前言 本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。 作者:碧茂大數據 PS:如有需要Python學習資料的小伙伴可以加下方的群去找免費管理員領取 input()輸入 Python提供了 input() 內置函數從標準輸入讀入一 ...
  • 從12年到20年,python以肉眼可見的趨勢超過了java,成為了當今It界人人皆知的編程語言。 python為什麼這麼火? 網路編程語言搜索指數 適合初學者 Python具有語法簡單、語句清晰的特點,這就讓初學者在學習階段可以把精力集中在編程對象和思維方法上。 大佬都在用 Google,YouT ...
  • 在社會上存在一種普遍的對培訓機構的學生一種歧視的現象,具體表現在,比如:當你去公司面試的時候,一旦你說了你是培訓機構出來的,那麼基本上你就涼了,那麼你瞞著不說,然後又通過了面試成功入職,但是以後一旦在公司被髮現有培訓經歷,可能會面臨被降薪,甚至被辭退,培訓機構出來的學生,在用人單位眼裡就是能力低下的 ...
  • from typing import List# 這道題看了大佬寫的代碼,經過自己的理解寫出來了。# 從最外圍的四周找有沒有為O的,如果有的話就進入深搜函數,然後深搜遍歷# 判斷上下左右的位置是否為Oclass Solution: def solve(self, board: List[List[s ...
  • import requests; import re; import os; # 1.請求網頁 header = { "user-agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, li ...
  • import requests; import re; import os; import parsel; 1.請求網頁 header = { "user-agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537. ...