[dotnet core]使用Peach簡化Socket網路通訊協議開發

来源:https://www.cnblogs.com/xuanye/archive/2019/02/27/a-lightweight-and-fast-socket-lib-base-on-dotnetty.html
-Advertisement-
Play Games

Peach是基於[DotNetty][1]的Socket網路通訊幫助類庫,可以幫助開發者簡化使用DotNetty,關於DotNetty可參考我之前的[這篇文章][2]。 Peach內置實現了一個基於文本協議的CommandLineProtocol,下麵的實例以這個協議展開,最後以[DotBPE][... ...


Peach是基於DotNetty的Socket網路通訊幫助類庫,可以幫助開發者簡化使用DotNetty,關於DotNetty可參考我之前的這篇文章
Peach內置實現了一個基於文本協議的CommandLineProtocol,下麵的實例以這個協議展開,最後以DotBPE中Amp協議來介紹下如何擴展自定義協議。

Github地址: https://github.com/xuanye/Peach

QuickStart 使用

添加引用

dotnet nuget add Peach

要使用Peach編寫網路程式,一般只需要三個步驟

  1. 實現協議傳輸消息IMessage
  2. 實現協議打包和解包邏輯IProtocol
  3. 實現ISocketService完成服務端邏輯編寫

    在快速開始的實例中,我們使用內置的CommandLineProtocol,所以省去了步驟1,2讓我們開始吧!

1 服務端

1.1 實現MyService

可分別重寫

  1. OnConnected 有客戶端連接上的事件
  2. OnDisConnected 客戶端斷開連接時的事件
  3. OnReceive 收到客戶端消息的事件
  4. OnException 發生異常時的事件,如異常斷開
public class MyService : Peach.AbsSocketService<Peach.Messaging.CommandLineMessage>
{
    private readonly ILogger<MyService> _logger;

    public MyService(ILogger<MyService> logger)
    {
        _logger = logger;
    }
    public override void OnConnected(ISocketContext<CommandLineMessage> context)
    {
        _logger.LogInformation("client connected from {0}", context.RemoteEndPoint);
        base.OnConnected(context);
    }

    public override void OnDisconnected(ISocketContext<CommandLineMessage> context)
    {
        _logger.LogInformation("client disconnected from {0}", context.RemoteEndPoint);
        base.OnDisconnected(context);
    }

    public override void OnException(ISocketContext<CommandLineMessage> context, Exception ex)
    {
        _logger.LogError(ex,"client from {0}, occ error {1}", context.RemoteEndPoint,ex.Message);
        base.OnException(context, ex);
    }

    public override void OnReceive(ISocketContext<CommandLineMessage> context, CommandLineMessage msg)
    {
        string replyMessage = string.Empty;
        string replyCmd = string.Empty;
        switch (msg.Command)
        {
            case "echo":
                replyMessage = msg.Parameters[0];
                replyCmd = "echo";
                break;
            case "init":
                replyMessage = "ok";
                replyCmd = "init_reply";

                break;
            default:
                replyMessage = "error unknow command";
                break;
        }


        Task.Run(async () =>
        {
            await context.SendAsync(new CommandLineMessage(replyCmd, replyMessage));
        });
    }

}

2. 掛載服務

服務預設掛載在5566埠

static void Main(string[] args)
{
    var builder = new HostBuilder()          
    .ConfigureServices((context,services) =>
    {
        //協議
        services.AddSingleton<IProtocol<CommandLineMessage>, CommandLineProtocol>();
        //掛載服務邏輯
        services.AddSingleton<ISocketService<CommandLineMessage>, MyService>();
        //添加掛載的宿主服務
        services.AddTcpServer<CommandLineMessage>();
    })
    .ConfigureLogging(
        logger =>
        {                   
            logger.AddConsole();
        }
    );
    builder.RunConsoleAsync().Wait();
}

2. 客戶端

2.1 使用內置的TcpClient

監聽接收消息和鏈接的消息,如下所示:

TcpClient<CommandLineMessage> client = new TcpClient<CommandLineMessage>(new CommandLineProtocol());
client.OnReceived += Client_OnReceived;
client.OnConnected += Client_OnConnected;

Task.Run(async () =>
{
    //連接伺服器
    var socketContext = await client.ConnectAsync(new IPEndPoint(Hey.IPUtility.GetLocalIntranetIP(), 5566));
    //發送消息
    var initCmd = new Hey.Messaging.CommandLineMessage("init");
    await socketContext.SendAsync(initCmd);
}).Wait();

可用的事件:

  • OnReceived 當收到服務端消息時
  • OnError 當通訊發生異常時
  • OnConnected 當連接上伺服器時
  • OnDisconnected 當與服務端斷開鏈接時
  • OnIdleState 鏈接閑置時觸發,一般在此事件中發送心跳包

3. 自定義協議

Peach支持使用自定義協議,擴展協議需要自行實現兩個介面:

3.1. IMessage 介面

實現類具體實現通訊消息的內容載體,只需實現如何獲取消息長度的屬性

public interface IMessage
{
    int Length { get;  }
}

3.2. IProtocol 介面

實現類需要描述消息頭信息和具體打包解包邏輯,頭信息描述參見ProtocolMeta欄位描述

/// <summary>
/// 協議介面
/// </summary>
/// <typeparam name="TMessage"></typeparam>
public interface IProtocol<TMessage>
    where TMessage :  Messaging.IMessage
{
    ProtocolMeta GetProtocolMeta();

    TMessage Parse(Buffer.IBufferReader reader);

    void Pack(Buffer.IBufferWriter writer, TMessage message);

}

3.3 Amp協議

為了更好讓讀者理解自定義協議的操作,這裡以DotBPE中的Amp協議為例,來具體講解一下,先來看下Amp協議的說明:

      0        1 2 3 4   5 6 7 8     9     10 11 12 13   1415      16171819    20    <length>-21
+------------+----------+---------+------+-------------+---------+---------+--------+------------+
| <ver/argc> | <length> |  <seq>  |<type>| <serviceId> | <msgId> |  <code> | <codec>|   <data>   |
+------------+----------+---------+------+-------------+---------+---------+--------+------------+

Amp協議固定包頭上21個位元組,說明如下:

  • ver/argc = 版本 固定填1
  • length = 為總包長
  • seq = 請求序列號
  • type = 消息類型
    • 1 = Request 請求消息
    • 2 = Response 響應消息
    • 3 = Notify 通知消息
    • 4 = OneWayRequest 調用不關心返回值
  • serId = serviceId 服務號
  • msgId = msgId 消息ID
  • code = 當 type = 0 (請求時)固定傳0 ,其他即為響應碼,如果響應碼不為0 則認為請求失敗,具體錯誤碼再定義
  • codecType = 編碼方式 0=預設 Protobuf 1=MessagePack 2=JSON
  • data = 實際的業務數據

3.3.1 AmpMessage實現

為了避免干擾因素,這裡的代碼去除了一些,輔助行的欄位和方法,AmpMessage其實是主要用於描述頭信息的,並且包含body的buffer數據 Data欄位,並實現獲取消息體Length的方法(用於發送消息時,計算緩衝區)

 public class AmpMessage :  Peach.Messaging.IMessage
    {
       /// <summary>
        /// 第一個版本為18個位元組頭固定長度
        /// </summary>
        public const int VERSION_0_HEAD_LENGTH = 18;
        /// <summary>
        /// 現有版本21個位元組頭固定長度
        /// </summary>
        public const int VERSION_1_HEAD_LENGTH = 21;
        /// <summary>
        /// 狀態碼
        /// </summary>
        public int Code { get; set; }

        //0 預設為Protobuf 1 MessagePack 2 = JSON
        public CodecType CodecType { get; set; }

        /// <summary>
        /// 實際的請求數據
        /// </summary>
        public byte[] Data { get; set; }

        public int Length {
            get
            {
                var hl = Version == 0 ? VERSION_0_HEAD_LENGTH : VERSION_1_HEAD_LENGTH;
                if(Data == null)
                {
                    return hl;
                }

                return hl + this.Data.Length;
            }
        }
        
        /// <summary>
        /// 消息標識
        /// </summary>
        public string Id => $"{ServiceId}|{MessageId}|{Sequence}";
        /// <summary>
        /// 調用服務的唯一消息號 確定哪個方法
        /// </summary>
        public ushort MessageId { get; set; }
        /// <summary>
        /// 請求的序列號
        /// </summary>
        public int Sequence { get; set; }
        /// <summary>
        /// 調用服務的唯一服務號 確定哪個服務
        /// </summary>
        public int ServiceId { get; set; }

        /// <summary>
        /// 協議版本0/1
        /// </summary>
        public byte Version { get; set; }

        public InvokeMessageType InvokeMessageType { get; set; }
    }
    
    public enum InvokeMessageType : byte
    {
        Request = 1,
        Response = 2,
        Notify = 3,
        OnewayRequest=4 //請求且不等待回覆
    }

3.3.2 AmpProtocol的實現

AmpProtocol中的實現主要是對ProtocolMeta描述,代碼中已有詳細註釋,至於打包和解包,就是根據協議Write或者Read對應的數據類型即可

/// <summary>
    /// Amp Protocol
    /// </summary>
    public class AmpProtocol : IProtocol<AmpMessage>
    {
        private readonly ISerializer _serializer;

        public AmpProtocol(ISerializer serializer)
        {
            this._serializer = serializer;
        }

        static readonly ProtocolMeta AMP_PROTOCOL_META = new ProtocolMeta
        {
            InitialBytesToStrip = 0, //讀取時需要跳過的位元組數
            LengthAdjustment = -5, //包實際長度的糾正,如果包長包括包頭和包體,則要減去Length之前的部分
            LengthFieldLength = 4, //長度欄位的位元組數 整型為4個位元組
            LengthFieldOffset = 1, //長度屬性的起始(偏移)位
            MaxFrameLength = int.MaxValue, //最大的數據包位元組數
            HeartbeatInterval = 30 * 1000 // 30秒沒消息發一個心跳包
        };

        public ProtocolMeta GetProtocolMeta()
        {
            return AMP_PROTOCOL_META;
        }

        public void Pack(IBufferWriter writer, AmpMessage message)
        {
            writer.WriteByte(message.Version);
            writer.WriteInt(message.Length);
            writer.WriteInt(message.Sequence);
            writer.WriteByte((byte)message.InvokeMessageType);

            if (message.Version == 0)
            {
                writer.WriteUShort((ushort)message.ServiceId);
            }
            else
            {
                writer.WriteInt(message.ServiceId);
            }
            writer.WriteUShort(message.MessageId);
            writer.WriteInt(message.Code);
            if(message.Version == 1)
            {
                writer.WriteByte(_serializer.CodecType);
            }

            if (message.Data != null)
            {
                writer.WriteBytes(message.Data);
            }
        }

        public AmpMessage Parse(IBufferReader reader)
        {
            if (reader.ReadableBytes == 0)
            {
                return null;
            }

            var msg = new AmpMessage {Version = reader.ReadByte()};

            int headLength;
            if (msg.Version == 0 )
            {
                headLength = AmpMessage.VERSION_0_HEAD_LENGTH;
                if (reader.ReadableBytes < AmpMessage.VERSION_0_HEAD_LENGTH - 1)
                {
                    throw new RpcCodecException($"decode error ,ReadableBytes={reader.ReadableBytes+1},HEAD_LENGTH={AmpMessage.VERSION_0_HEAD_LENGTH}");
                }
            }
            else if (msg.Version == 1 )
            {
                headLength = AmpMessage.VERSION_1_HEAD_LENGTH;
                if (reader.ReadableBytes < AmpMessage.VERSION_1_HEAD_LENGTH - 1)
                {
                    throw new RpcCodecException($"decode error ,ReadableBytes={reader.ReadableBytes+1},HEAD_LENGTH={AmpMessage.VERSION_1_HEAD_LENGTH}");
                }
            }
            else
            {
                throw new RpcCodecException($"decode error ,{msg.Version} is not support");
            }

            var length = reader.ReadInt();
            msg.Sequence = reader.ReadInt();
            var type = reader.ReadByte();
            msg.InvokeMessageType = (InvokeMessageType)Enum.ToObject(typeof(InvokeMessageType), type);


            msg.ServiceId = msg.Version == 0 ? reader.ReadUShort() : reader.ReadInt();


            msg.MessageId = reader.ReadUShort();
            msg.Code = reader.ReadInt();

            if (msg.Version == 1)
            {
                byte codeType = reader.ReadByte();
                if (codeType != this._serializer.CodecType)
                {
                    throw  new RpcCodecException($"CodecType:{codeType} is not Match {this._serializer.CodecType}");
                }
                msg.CodecType = (CodecType)Enum.ToObject(typeof(CodecType), codeType);
            }
            else
            {
                msg.CodecType = CodecType.Protobuf;
            }

            int left = length - headLength;
            if (left > 0)
            {
                if (left > reader.ReadableBytes)
                {
                    throw new RpcCodecException("message not long enough!");
                }
                msg.Data = new byte[left];
                reader.ReadBytes(msg.Data);
            }
            return msg;
        }
    }
}

4. 後記

Peach的產生主要是源於對DotBPE的重構,因為在其他項目中有關於通訊的其他需求,所以這塊雖然比較簡單,也可比較獨立,所以單獨開了一個庫來實現對DotNetty的封裝。另外歡迎各位dotnet core的同學一起學習交流 QQ群號:699044833


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 綜合網上資源完成的自己的第一篇博客 網上類似的貼子挺多的,由於情況不太一樣。網上相關帖子都是在 MainWindow 嵌入。我需要在原有客戶端上開發新的插件即用戶控制項庫實現嵌入外部exe。 主要問題:獲取不到視窗句柄。 1、利用系統API實現嵌入。 2、當時在獲取頁面(用戶控制項庫)的句柄問題上碰壁, ...
  • 下載了codesmith 8,連接Mysql卻提示“找不到請求的 .Net Framework Data Provider"。 1,下載MySql.Data.dll:https://dev.mysql.com/downloads/windows/visualstudio/ 下載zip格式的即可,解壓 ...
  • 今天這篇文章我將通過實例代碼帶著大家一步一步通過abp vNext這個asp.net core的快速開發框架來進行Quartz.net定時任務調度的管理界面的開發。大伙最好跟著一起敲一下代碼,當然源碼我會上傳到github上,有興趣的小伙伴可以在文章底部查看源碼鏈接。 作者:依樂祝 原文鏈接:htt ...
  • 在實際業務系統的開發中,往往需要幾個系統協同一起出來同一個資料庫的數據,資料庫可以是同一個資料庫,也可以根據業務拆分的多個資料庫,如我們企業微信的應用、後臺業務管理系統、以及專門為數據提供服務的API服務介面等,這樣可以極大程度上給我們的業務數據提供支撐,並根據不同的特性進行分開管理和維護。 ...
  • 在開發業務管理系統的時候,往往涉及到資產信息及編碼的列印處理,如我們需要對資產信息、條形碼、二維碼一起列印,以便貼在具體資產信息上面,方便微信公眾號、企業微信進行業務處理,那麼編碼的列印就很有必要了,本篇隨筆介紹實際生產環境中的資產編碼列印處理。 ...
  • 一. 原生SQL查詢 接著上篇講。通過 Entity Framework Core 可以在使用關係資料庫時下降到原始 SQL 查詢。 在無法使用 LINQ 表達要執行的查詢時,或因使用 LINQ 查詢而導致低效的 SQL 查詢時非常有用。 原始 SQL 查詢可返回實體類型,或者從 EF Core 2 ...
  • 因為工作需要調用WebService介面,查了下資料,發現添加服務引用可以直接調用websevice 參考地址:https://www.cnblogs.com/peterpc/p/4628441.html 如果不添加服務引用又怎麼做呢?於是又去查看怎麼根據http協議調用webservice並做了個 ...
  • Insql 國人開發,是一款汲取 Mybatis 優點的.NET ORM 框架。追求簡單直觀,使用自由靈活等特點。 項目主頁: "https://rainrcn.github.io/insql" 此 ORM 是以 Mybatis 的 Sql 配置方式,以 Dapper 為對象映射的基礎上建立。喜歡寫 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...