C#高性能TCP服務的多種實現方式

来源:http://www.cnblogs.com/gaochundong/archive/2016/02/05/csharp_tcp_service_models.html
-Advertisement-
Play Games

本篇文章的主旨是使用 .NET/C# 實現 TCP 高性能服務的不同方式,包括但不限於如下內容:APM 方式,即 Asynchronous Programming Model;TAP 方式,即 Task-based Asynchronous Pattern;SAEA 方式,即 SocketAsync...


哎~~ 想想大部分園友應該對 "高性能" 字樣更感興趣,為了吸引眼球所以標題中一定要突出,其實我更喜歡的標題是《猴賽雷,C#編寫TCP服務的花樣姿勢!》

本篇文章的主旨是使用 .NET/C# 實現 TCP 高性能服務的不同方式,包括但不限於如下內容:

在 .NET/C# 中對於 Socket 的支持均是基於 Windows I/O Completion Ports 完成埠技術的封裝,通過不同的 Non-Blocking 封裝結構來滿足不同的編程需求。以上方式均已在 Cowboy.Sockets 中有完整實現,並且 APM 和 TAP 方式已經在實際項目中應用。Cowboy.Sockets 還在不斷的進化和完善中,如有任何問題請及時指正。

雖然有這麼多種實現方式,但抽象的看,它們是一樣一樣的,用兩個 Loop 即可描述:Accept LoopRead Loop,如下圖所示。(這裡提及的 "Loop" 指的是一種迴圈方式,而非特指 while/for 等關鍵字。)

  • 在任何 TCP Server 的實現中,一定存在一個 Accept Socket Loop,用於接收 Client 端的 Connect 請求以建立 TCP Connection。
  • 在任何 TCP Server 的實現中,一定存在一個 Read Socket Loop,用於接收 Client 端 Write 過來的數據。

如果 Accept 迴圈阻塞,則會導致無法快速的建立連接,服務端 Pending Backlog 滿,進而導致 Client 端收到 Connect Timeout 的異常。如果 Read 迴圈阻塞,則顯然會導致無法及時收到 Client 端發過來的數據,進而導致 Client 端 Send Buffer 滿,無法再發送數據。

從實現細節的角度看,能夠導致服務阻塞的位置可能在:

  1. Accept 到新的 Socket,構建新的 Connection 需要分配各種資源,分配資源慢;
  2. Accept 到新的 Socket,沒有及時觸發下一次 Accept;
  3. Read 到新的 Buffer,判定 Payload 消息長度,判定過程長;
  4. Read 到新的 Buffer,發現 Payload 還沒有收全,繼續 Read,則可能會導致一次 Buffer Copy;
  5. Payload 接收完畢,進行 Serialization 轉成可識別的 Protocol Message,序列化慢;
  6. 由 Business Module 來處理相應的 Protocol Message,處理過程慢;

1-2 涉及到 Accept 過程和 Connection 的建立過程,3-4 涉及到 ReceiveBuffer 的處理過程,5-6 涉及到應用邏輯側的實現。

Java 中著名的 Netty 網路庫從 4.0 版本開始對於 Buffer 部分做了全新的嘗試,採用了名叫 ByteBuf 的設計,實現 Buffer Zero Copy 以減少高併發條件下 Buffer 拷貝帶來的性能損失和 GC 壓力。DotNettyOrleans ,Helios 等項目正在嘗試在 C# 中進行類似的 ByteBuf 的實現。

APM 方式:TcpSocketServer

TcpSocketServer 的實現是基於 .NET Framework 自帶的 TcpListenerTcpClient 的更進一步的封裝,採用基於 APM 的 BeginXXX 和 EndXXX 介面實現。

TcpSocketServer 中的 Accept Loop 指的就是,

  • BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...

每一個建立成功的 Connection 由 TcpSocketSession 來處理,所以 TcpSocketSession 中會包含 Read Loop,

  • BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...

TcpSocketServer 通過暴露 Event 來實現 Connection 的建立與斷開和數據接收的通知。

  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;
  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;
  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;

使用也是簡單直接,直接訂閱事件通知。

  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }
  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }
  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }
  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {
      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }

TAP 方式:AsyncTcpSocketServer

AsyncTcpSocketServer 的實現是基於 .NET Framework 自帶的 TcpListener 和 TcpClient 的更進一步的封裝,採用基於 TAP 的 async/await 的 XXXAsync 介面實現。

然而,實際上 XXXAsync 並沒有創建什麼神奇的效果,其內部實現只是將 APM 的方法轉換成了 TAP 的調用方式。

  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]
  public Task<Socket> AcceptSocketAsync()
  {
      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]
  public Task<TcpClient> AcceptTcpClientAsync()
  {
      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }

AsyncTcpSocketServer 中的 Accept Loop 指的就是,

  while (IsListening)
  {
      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

每一個建立成功的 Connection 由 AsyncTcpSocketSession 來處理,所以 AsyncTcpSocketSession 中會包含 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }

為了將 async/await 非同步到底,AsyncTcpSocketServer 所暴露的介面也同樣是 Awaitable 的。

  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }

使用時僅需將一個實現了該介面的對象註入到 AsyncTcpSocketServer 的構造函數中即可。

  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {
      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }

當然,對於介面的實現也不是強制了,也可以在構造函數中直接註入方法的實現。

  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}

SAEA 方式:TcpSocketSaeaServer

SAEA 是 SocketAsyncEventArgs 的簡寫。SocketAsyncEventArgs 是 .NET Framework 3.5 開始支持的一種支持高性能 Socket 通信的實現。SocketAsyncEventArgs 相比於 APM 方式的主要優點可以描述如下:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

也就是說,優點就是無需為每次調用都生成 IAsyncResult 等對象,向原生 Socket 更靠近一些。

使用 SocketAsyncEventArgs 的推薦步驟如下:

  1. Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
  2. Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
  3. Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
  4. If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
  5. If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
  6. Reuse the context for another operation, put it back in the pool, or discard it.

重點在於池化(Pooling),池化的目的就是為了重用和減少運行時分配和垃圾回收的壓力。

TcpSocketSaeaServer 即是對 SocketAsyncEventArgs 的應用和封裝,並實現了 Pooling 技術。TcpSocketSaeaServer 中的重點是 SaeaAwaitable 類,SaeaAwaitable 中內置了一個 SocketAsyncEventArgs,並通過 GetAwaiter 返回 SaeaAwaiter 來支持 async/await 操作。同時,通過 SaeaExtensions 擴展方法對來擴展 SocketAsyncEventArgs 的 Awaitable 實現。

  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)

SaeaPool 則是一個 QueuedObjectPool<SaeaAwaitable> 的衍生實現,用於池化 SaeaAwaitable 實例。同時,為了減少 TcpSocketSaeaSession 的構建過程,也實現了 SessionPool 即 QueuedObjectPool<TcpSocketSaeaSession>。

TcpSocketSaeaServer 中的 Accept Loop 指的就是,

  while (IsListening)
  {
      var saea = _acceptSaeaPool.Take();
  
      var socketError = await _listener.AcceptAsync(saea);
      if (socketError == SocketError.Success)
      {
          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }

每一個建立成功的 Connection 由 TcpSocketSaeaSession 來處理,所以 TcpSocketSaeaSession 中會包含 Read Loop,

  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);
  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);
  
      var socketError = await _socket.ReceiveAsync(saea);
      if (socketError != SocketError.Success)
          break;
  
      var receiveCount = saea.Saea.BytesTransferred;
      if (receiveCount == 0)
          break;
  }

同樣,TcpSocketSaeaServer 對外所暴露的介面也同樣是 Awaitable 的。

  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }

使用起來也是簡單直接:

  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }

RIO 方式:TcpSocketRioServer

從 Windows 8.1 / Windows Server 2012 R2 開始,微軟推出了 Registered I/O Networking Extensions 來支持高性能 Socket 服務的實現,簡稱 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue
  • RIOCreateCompletionQueue
  • RIOCreateRequestQueue
  • RIODequeueCompletion
  • RIODeregisterBuffer
  • RIONotify
  • RIOReceive
  • RIOReceiveEx
  • RIORegisterBuffer
  • RIOResizeCompletionQueue
  • RIOResizeRequestQueue
  • RIOSend
  • RIOSendEx

到目前為止,.NET Framework 還沒有推出對 RIO 的支持,所以若想在 C# 中實現 RIO 則只能通過 P/Invoke 方式,RioSharp 是開源項目中的一個比較完整的實現。

Cowboy.Sockets 直接引用了 RioSharp 的源代碼,放置在 Cowboy.Sockets.Experimental 名空間下,以供實驗和測試使用。

同樣,通過 TcpSocketRioServer 來實現 Accept Loop,

_listener.OnAccepted = (acceptedSocket) =>
{
    Task.Run(async () =>
    {
        await Process(acceptedSocket);
    })
    .Forget();
};

通過 TcpSocketRioSession 來處理 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
      if (receiveCount == 0)
          break;
  }

測試代碼一如既往的類似:

  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketRioSession session)
      {
          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }

參考資料

本篇文章《C#高性能TCP服務的多種實現方式》由 Dennis Gao 發表自博客園個人博客,未經作者本人同意禁止以任何的形式轉載,任何自動的或人為的爬蟲轉載行為均為耍流氓。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Linux下怎麼查看當前系統的版本: uname -r 功能說明:uname用來獲取電腦和操作系統的相關信息。 語 法:uname [-amnrsvpio][--help][--version] 補充說明:uname可顯示linux主機所用的操作系統的版本、硬體的名稱等基本信息。 參 數: -a或–
  • 我的環境:主機是win7的,虛擬機是VWare Workstation 6.0 ,linux系統為Red Hat Enterprise Linux 5 64位 各軟體版本:jdk是jdk-6u35-linux-x64.bin,tomcat是apache-tomcat-6.0.35.tar.gz,數據
  • #!/bin/bashDATE=`date +%Y-%m-%d-%H:%M -d -3minute`USER=rootPASSWORD=mayboBACKUP_DIR='/home/mysqlbak/'LOG_DIR='/home/mysqlbak/mysqlbak_log.log'DATABASE
  • 一. 理論分析1. 幾個概念:FIMC : Fully Interactive Mobile Camera (完全互動式移動攝像機)FIMD: Fully Interactive Mobile Display (完全互動式移動顯示設備)2. 設置VCLK在VIDCON0中bit[3:2]-->Sel
  • 由於前幾次都沒能寫完,這次年底總算有自由時間了,又想繼續搗鼓一下。於是下載了VS 2015專業版(不知為什麼我特別鐘愛專業版,而不喜歡企業版)。由於以前的教訓,我這次決定寫一個極簡的Deom,簡到什麼程度呢?簡單到只實現添加、修改欄目,用戶登錄後可以添加管理文章、管理員登陸後可以修改網站設置(也就標...
  • 1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Web; 5 using System.Web.UI; 6 using System.Web.UI.WebControls;
  • 之前就是說過“一個項目有很多重要的步驟以及功能”,那我們現在就來看看對於KTV項目來說;後臺是處於什麼樣的重要作用! 首先就得瞭解KTV後臺的一些功能了: 1.歌曲管理 、歌手管理 、設置資源路徑 2.新增歌手、歌手查詢、新增歌曲、歌曲查詢、更改歌曲路徑以及退出點歌系統 一.後臺登錄界面 01.判斷
  • 這裡是簡要的一些微信支付,公眾號支付的一些流程,包括以下配置信息,錯誤信息等,並不全面,但是希望能夠幫助到大家,不喜勿噴,我也是新手,也當是給自己寫了一個筆記,加深一下影響,以後再遇到,也能方便自己的學習
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...