Pipe——高性能IO(二)

来源:https://www.cnblogs.com/yswenli/archive/2019/11/26/11937246.html
-Advertisement-
Play Games

Pipelines - .NET中的新IO API指引(一) Pipelines - .NET中的新IO API指引(二) 關於System.IO.Pipelines的一篇說明 System.IO.Pipelines: .NET高性能IO System.IO.Pipelines 是對IO的統一抽象, ...


Pipelines - .NET中的新IO API指引(一)

Pipelines - .NET中的新IO API指引(二)

關於System.IO.Pipelines的一篇說明

System.IO.Pipelines: .NET高性能IO

System.IO.Pipelines 是對IO的統一抽象,文件、com口、網路等等,重點在於讓調用者註意力集中在讀、寫緩衝區上,典型的就是 IDuplexPipe中的Input Output。

可以理解為將IO類抽象為讀、寫兩個緩衝區。

目前官方實現還處於preview狀態,作者使用Socket和NetworkStream 實現了一個 Pipelines.Sockets.Unofficial

作者在前兩篇中提到使用System.IO.Pipelines 改造StackExchange.Redis,在本篇中作者採用了改造現有的SimplSockets庫來說明System.IO.Pipelines的使用。 

文章中的代碼SimplPipelines,KestrelServer )

## SimplSockets說明

+ 可以單純的發送(Send),也可以完成請求/響應處理(SendRecieve)
+ 同步Api
+ 提供簡單的幀協議封裝消息數據
+ 使用byte[]
+ 服務端可以向所有客戶端廣播消息
+ 有心跳檢測等等     屬於非常典型的傳統Socket庫。

## 作者的改造說明

### 對緩衝區數據進行處理的一些方案及選型

   1. 使用byte[]拷貝出來,作為獨立的數據副本使用,簡單易用但成本高(分配和複製)
   2. 使用 ReadOnlySequence<byte> ,零拷貝,快速但有限制。一旦在管道上執行Advance操作,數據將被回收。在有嚴格控制的服務端處理場景(數據不會逃離請求上下文)下可以使用,言下之意使用要求比較高。
   3. 作為2的擴展方案,將數據載荷的解析處理代碼移至類庫中(處理ReadOnlySequence<byte>),只需將解構完成的數據發佈出來,也許需要一些自定義的structs 映射(map)一下。這裡說的應該是直接將記憶體映射為Struct?
   4. 通過返回Memory<byte> 獲取一份數據拷貝,也許需要從ArrayPool<byte>.Share 池中返回一個大數組;但是這樣對調用者要求較高,需要返回池。並且從Memory<T> 獲取一個T[]屬於高級和不安全的操作。不安全,有風險。( not all Memory<T> is based on T[])
   5. 一個妥協方案,返回一個提供Memory<T>(Span<T>)的東西,並且使用一些明確的顯而易見的Api給用戶,這樣用戶就知道應該如何處理返回結果。比如IDisposable/using這種,在Dispose()被調用時將資源歸還給池。
  
   作者認為,設計一個通用的消息傳遞Api時,方案5更為合理,調用方可以保存一段時間的數據並且不會幹擾到管道的正常工作,也可以很好的利用ArrayPool。如果調用者沒有使用using也不會有什麼大麻煩,只是效率會降低一些,就像使用方案1一樣。
     但是方案的選擇需要充分考慮你的實際場景,比如在StackExchange.Redis 客戶端中使用的是方案3;在不允許數據離開請求上下文時使用方案2.。
  一旦選定方案,以後基本不可能再更改。    針對效率最高的方案2,作者提出的專業建議是 **使用ref struct** 。    此處選擇的是方案5,與方案4的區別就是對Memory<T> 的處理,作者使用 System.Buffers.IMemoryOwner<T>介面
 

 public interface IMemoryOwner<T> : IDisposable
 {
  Memory<T> Memory { get; }
 }

 

   以下為實現代碼,Dispose時歸還借出的數組,並考慮線程安全,避免多次歸還(very bad)。
   複製代碼
private sealed class ArrayPoolOwner<T>:IMemoryOwner<T>{
 private readonly int _length;
 private T[] _oversized;
 internal ArrayPoolOwner(T[] oversized,int length){
  _length=length;
  _oversized=oversized;
 }
 public Memory<T> Memory=>new Memory<T>(GetArray(),0,_length);
 private T[] GetArray()=>Interlocked.CompareExchange(ref _oversized,null,null)
  ?? throw new ObjectDisposedException(ToString());
 public void Dispose(){
  var arr=Interlocked.Exchange(ref _oversized,null);
  if(arr!=null) ArrayPool<T>.Shared.Return(arr);
 }
}
複製代碼

 

  Dispose後如果再次調用Memory將會失敗,即 使用時 using,不要再次使用。 **對ArrayPool的一些說明**
+ 從ArrayPool借出的數組比你需要的要大,你給定的大小在ArrayPool看來屬於下限(不可小於你給定的大小),見:ArrayPool<T>.Shared.Rent(int minimumLength);
+ 歸還時數組預設不清空,因此你借出的數組內可能會有垃圾數據;如果需要清空,在歸還時使用 ArrayPool<T>.Shared.Return(arr,true) ;   作者對ArrayPool的一些建議: 
增加 IMemoryOwner<T> RentOwned(int length),T[] Rent(int minimumLength) 及借出時清空數組,歸還時清空數組的選項。
 這裡的想法是通過IMemoryOwner<T>實現一種所有權的轉移,典型調用方法如下
 
複製代碼
 void DoSomething(IMemoryOwner<byte> data){
  using(data){
       // ... other things here ...
                DoTheThing(data.Memory);
  }
  // ... more things here ...
 }
複製代碼

 

 通過ArrayPool的借、還機制避免頻繁分配。    **作者的警告:**
 + 不要把data.Memory 單獨取出亂用,using完了就不要再操作它了(這種錯誤比較基礎)
 + 有人會用MemoryMarshal搞出數組使用,作者認為可以實現一個 MemoryManager<T>(ArrayPoolOwner<T> : MemoryManager<T>, since MemoryManager<T> : IMemoryOwner<T>)讓.Span如同.Memory一樣失敗。
 ---- 作者也挺糾結(周道)的 :)。 使用  ReadOnlySequence<T> 填充ArrayPoolOwner(構造,實例化)
  複製代碼
public static IMemoryOwner<T> Lease<T>(this ReadOnlySequence<T> source)
    {
        if (source.IsEmpty) return Empty<T>();
        int len = checked((int)source.Length);
        var arr = ArrayPool<T>.Shared.Rent(len);//借出
        source.CopyTo(arr);
        return new ArrayPoolOwner<T>(arr, len);//dispose時歸還
    }
複製代碼

 

### 基本API

  服務端和客戶端雖然不同但代碼有許多重疊的地方,比如都需要某種線程安全機制的寫入,需要某種讀迴圈來處理接收的數據,因此可以共用一個基類。
基類中使用IDuplexPipe(包括input,output兩個管道)作為管道。
複製代碼
public abstract class SimplPipeline : IDisposable
    {
        private IDuplexPipe _pipe;
        protected SimplPipeline(IDuplexPipe pipe)
            => _pipe = pipe;
        public void Dispose() => Close();
        public void Close() {/* burn the pipe*/}
    }
複製代碼

 

首先,需要一種線程安全的寫入機制並且不會過度阻塞調用方。在原SimplSockets(包括StackExchange.Redis v1)中使用消息隊列來處理。調用方Send時同步的將消息入隊,在將來的某刻,消息出隊並寫入到Socket中。此方式存在的問題
+ 有許多移動的部分
+ 與“pipelines”有些重覆 管道本身即是隊列,本身具備輸出(寫、發送)緩衝區,沒必要再增加一個隊列,直接把數據寫入管道即可。取消原有隊列只有一些小的影響,在StackExchange.Redis v1 中使用隊列完成優先順序排序處理(隊列跳轉),作者表示不擔心這一點。 **寫入Api設計**
 + 不一定時同步的
 + 調用方可以單純的傳入一段記憶體數據(ReadOnlyMember<byte>),或者是一個(IMemoryOwner<byte>)由Api寫入後進行清理。
  + 先假設讀、寫分開(暫不考慮響應)

複製代碼
protected async ValueTask WriteAsync(IMemoryOwner<byte> payload, int messageId)//調用方不再使用payload,需要我們清理
    {
        using (payload)
  {
   await WriteAsync(payload.Memory, messageId);
  }
 }
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId);//調用方自己清理
複製代碼

 

 messageId標識一條消息,寫入消息頭部, 用於之後處理響應回覆信息。
   返回值使用ValueTask因為寫入管道通常是同步的,只有管道執行Flush時才可能是非同步的(大多數情況下也是同步的,除非在管道被備份時)。

### 寫入與錯誤

首先需要保證單次寫操作,lock在此不合適,因為它不能與非同步操作很好的協同。考慮flush有可能是非同步的,導致後續(continuation )部分可能會在另外的線程上。這裡使用與非同步相容的SemaphoreSlim。
下麵是一條指南:**一般來說, 應用程式代碼應針對可讀性進行優化;庫代碼應針對性能進行優化。**
以下為機翻原文
您可能同意也可能不同意這一點, 但這是我編寫代碼的一般指南。我的意思是,類庫代碼往往有一個單一的重點目的, 往往由一個人的經驗可能是 "深刻的, 但不一定是    廣泛的" 維護;你的大腦專註於那個領域, 用奇怪的長度來優化代碼是可以的。相反,應用程式代碼往往涉及更多不同概念的管道-"寬但不一定深" (深度隱藏在各種庫      中)。應用程式代碼通常具有更複雜和不可預知的交互, 因此重點應放在可維護和 "明顯正確" 上。
  基本上, 我在這裡的觀點是, 我傾向於把很多註意力集中在通常不會放入應用程式代碼中的優化上, 因為我從經驗和廣泛的基準測試中知道它們真的很重要。所以。。。我要做一些看起來很奇怪的事情, 我希望你和我一起踏上這段旅程。   “明顯正確”的代碼 複製代碼
private readonly SemaphoreSlim _singleWriter= new SemaphoreSlim(1);
protected async ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId)
{
    await _singleWriter.WaitAsync();
    try
    {
        WriteFrameHeader(writer, payload.Length, messageId);
        await writer.WriteAsync(payload);
    }
    finally
    {
        _singleWriter.Release();
    }
}
複製代碼

 

這段代碼沒有任何問題,但是即便所有部分都是同步完成的,就會產生多餘的狀態機-------大概是 不是所有地方都需要非同步處理 的意思。
通過兩個問題進行重構
- 單次寫入是否沒有競爭?(無人爭用)
- Flush是否為同步 重構,將原WriteAsync 更名為 WriteAsyncSlowPath,增加新的WriteAsync 作者的“一些看起來很奇怪的” 實現   複製代碼
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId)
{
    // try to get the conch; if not, switch to async
//writer已經被占用,非同步
    if (!_singleWriter.Wait(0))
        return WriteAsyncSlowPath(payload, messageId);
    bool release = true;
    try
    {
        WriteFrameHeader(writer, payload.Length, messageId);
        var write = writer.WriteAsync(payload);
        if (write.IsCompletedSuccessfully) return default;
        release = false;
        return AwaitFlushAndRelease(write);
    }
    finally
    {
        if (release) _singleWriter.Release();
    }
}
async ValueTask AwaitFlushAndRelease(ValueTask<FlushResult> flush)
{
    try { await flush; }
    finally { _singleWriter.Release(); }
}
複製代碼

 

三個地方
1. _singleWriter.Wait(0) 意味著writer處於空閑狀態,沒有其他人在調用
2. write.IsCompletedSuccessfully 意味著writer同步flush
3. 輔助方法 AwaitFlushAndRelease 處理非同步flush情況
-------------------------------------------------------------------------------------

### 協議頭處理

協議頭由兩個int組成,小端,第一個是長度,第二個是messageId,共8位元組。
複製代碼
void WriteFrameHeader(PipeWriter writer, int length, int messageId)
{
    var span = writer.GetSpan(8);
    BinaryPrimitives.WriteInt32LittleEndian(
        span, length);
    BinaryPrimitives.WriteInt32LittleEndian(
        span.Slice(4), messageId);
    writer.Advance(8);
}
複製代碼

 

### 管道客戶端實現 發送
 
複製代碼
public class SimplPipelineClient : SimplPipeline
{
    public async Task<IMemoryOwner<byte>> SendReceiveAsync(ReadOnlyMemory<byte> message)
    {
        var tcs = new TaskCompletionSource<IMemoryOwner<byte>>();
        int messageId;
        lock (_awaitingResponses)
        {
            messageId = ++_nextMessageId;
            if (messageId == 0) messageId = 1;
            _awaitingResponses.Add(messageId, tcs);
        }
        await WriteAsync(message, messageId);
        return await tcs.Task;
    }
    public async Task<IMemoryOwner<byte>> SendReceiveAsync(IMemoryOwner<byte> message)
    {
        using (message)
        {
            return await SendReceiveAsync(message.Memory);
        }
    }
}
複製代碼

 

- _awaitingResponses 是個字典,保存已經發送的消息,用於將來處理對某條(messageId)消息的回覆。 ### 接收迴圈   複製代碼
protected async Task StartReceiveLoopAsync(CancellationToken cancellationToken = default)
{
   try
   {
       while (!cancellationToken.IsCancellationRequested)
       {
           var readResult = await reader.ReadAsync(cancellationToken);
           if (readResult.IsCanceled) break;
           var buffer = readResult.Buffer;
           var makingProgress = false;
           while (TryParseFrame(ref buffer, out var payload, out var messageId))
           {
               makingProgress = true;
               await OnReceiveAsync(payload, messageId);
           }
           reader.AdvanceTo(buffer.Start, buffer.End);
           if (!makingProgress && readResult.IsCompleted) break;
       }
       try { reader.Complete(); } catch { }
   }
   catch (Exception ex)
   {
       try { reader.Complete(ex); } catch { }
   }
}
protected abstract ValueTask OnReceiveAsync(ReadOnlySequence<byte> payload, int messageId);
複製代碼

 

接收緩衝區里什麼時間會有什麼東西由發送方和系統環境決定,因此延遲是必然的,所以這裡全部按非同步處理就好。
- TryParseFrame 讀出緩衝區數據,根據幀格式解析出實際數據、id等
- OnRecieveAsync 處理數據,比如對於回覆/響應的處理等
- reader中的數據讀出後儘快Advance一下,通知管道你的讀取進度; 解析幀   複製代碼
private bool TryParseFrame(
    ref ReadOnlySequence<byte> input,
    out ReadOnlySequence<byte> payload, out int messageId)
{
    if (input.Length < 8)
    {   // not enough data for the header
        payload = default;
        messageId = default;
        return false;
    }
    int length;
    if (input.First.Length >= 8)
    {   // already 8 bytes in the first segment
        length = ParseFrameHeader(
            input.First.Span, out messageId);
    }
    else
    {   // copy 8 bytes into a local span
        Span<byte> local = stackalloc byte[8];
        input.Slice(0, 8).CopyTo(local);
        length = ParseFrameHeader(
            local, out messageId);
    }
    // do we have the "length" bytes?
    if (input.Length < length + 8)
    {
        payload = default;
        return false;
    }
    // success!
    payload = input.Slice(8, length);
    input = input.Slice(payload.End);
    return true;
}
複製代碼

 

緩衝區是不連續的,一段一段的,像鏈表一樣,第一段就是input.First。
代碼很簡單,主要演示一些用法;
輔助方法
複製代碼
static int ParseFrameHeader(
    ReadOnlySpan<byte> input, out int messageId)
{
    var length = BinaryPrimitives
            .ReadInt32LittleEndian(input);
    messageId = BinaryPrimitives
            .ReadInt32LittleEndian(input.Slice(4));
    return length;
}
複製代碼

 


OnReceiveAsync
  複製代碼
protected override ValueTask OnReceiveAsync(
    ReadOnlySequence<byte> payload, int messageId)
{
    if (messageId != 0)
    {   // request/response
        TaskCompletionSource<IMemoryOwner<byte>> tcs;
        lock (_awaitingResponses)
        {
            if (_awaitingResponses.TryGetValue(messageId, out tcs))
            {
                _awaitingResponses.Remove(messageId);
            }
        }
        tcs?.TrySetResult(payload.Lease());
    }
    else
    {   // unsolicited
        MessageReceived?.Invoke(payload.Lease());
    }
    return default;
}
複製代碼

 


到此為止,其餘部分主要是一些服務端和其他功能實現及benchmark。。。
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 之前一直是在Ubuntu下進行Python和Django開發,最近換了電腦,把在Virtual Box 下跑的Ubuntu開發機挪過來總是頻繁崩潰,索性就嘗試把開發環境挪到Windows主力機了。 不得不說,巨硬家這幾年在多元並包方面真的是走在了世界前列。特別是VSCode,兩年前已經成為了我在Li ...
  • Java操作資料庫——手動實現資料庫連接池 摘要:本文主要學習瞭如何手動實現一個資料庫連接池,以及在這基礎上的一些改進。 部分內容來自以下博客: https://blog.csdn.net/soonfly/article/details/72731144 一個簡單的資料庫連接池 連接池工具類 連接池 ...
  • 1 #include<iostream> 2 using namespace std; 3 struct tree{ 4 int l,r,sum; 5 }t[1000001]; 6 int a[1000001],n,p,x,y,m; 7 inline void build(int root,int ...
  • 為什麼標準庫里要有traits? 我們先回憶一下,標準庫提供的演算法的一些特征: 參數一般包括iterator。 要根據iterator的種類,和iterator包裝的元素的類型等信息,來決定使用最優化的演算法。 比如如果是vector的iterator,那麼就可以使用+, 操作; 如果是list的it ...
  • 併發和並行 在真正開始聊本文的主題之前,我們先來回顧下兩個老生常談的概念:併發和並行。 併發 :是指多個線程任務在同一個CPU上快速地輪換執行,由於切換的速度非常快,給人的感覺就是這些線程任務是在同時進行的,但其實併發只是一種邏輯上的同時進行; 並行 :是指多個線程任務在不同CPU上同時進行,是真正 ...
  • 1. 成果 獻祭了周末的晚上,成功召喚出了上面的番茄鐘。正當我在感慨“不愧是Shadow大人,這難道就是傳說中的五彩斑斕的黑?” “那才不是什麼陰影效果,那是發光效果。”被路過的老婆吐槽了。 繫系系,老婆說的都系對的。我還以為我在做陰影動畫,現在只好改博客標題了? 要實現上面的動畫效果,首先使用Co ...
  • 面試的時候估計都會被問過,什麼是委托,事件是不是一種委托?委托的優點都是什麼?我在項目中經常使用,但是平時不註意整理概念性知識,回答起來像是囫圇吞棗,答不出個所以然來。今天周末抽出來一些時間,靜下心來整理下。下麵我將採用一問一答的性質來整理和記錄。 1.什麼是委托? 委托是一種類型安全的對象,它是指 ...
  • 約束 public abstract class BaseModel { public int Id { get; set; } } 連接字元串 public static readonly string Customers = ConfigurationManager.ConnectionStri ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...