Peach是基於[DotNetty][1]的Socket網路通訊幫助類庫,可以幫助開發者簡化使用DotNetty,關於DotNetty可參考我之前的[這篇文章][2]。 Peach內置實現了一個基於文本協議的CommandLineProtocol,下麵的實例以這個協議展開,最後以[DotBPE][... ...
Peach是基於DotNetty的Socket網路通訊幫助類庫,可以幫助開發者簡化使用DotNetty,關於DotNetty可參考我之前的這篇文章。
Peach內置實現了一個基於文本協議的CommandLineProtocol,下麵的實例以這個協議展開,最後以DotBPE中Amp協議來介紹下如何擴展自定義協議。
Github地址: https://github.com/xuanye/Peach
QuickStart 使用
添加引用
dotnet nuget add Peach
要使用Peach編寫網路程式,一般只需要三個步驟
- 實現協議傳輸消息IMessage
- 實現協議打包和解包邏輯IProtocol
- 實現ISocketService完成服務端邏輯編寫
在快速開始的實例中,我們使用內置的CommandLineProtocol,所以省去了步驟1,2讓我們開始吧!
1 服務端
1.1 實現MyService
可分別重寫
OnConnected
有客戶端連接上的事件
OnDisConnected
客戶端斷開連接時的事件OnReceive
收到客戶端消息的事件OnException
發生異常時的事件,如異常斷開
public class MyService : Peach.AbsSocketService<Peach.Messaging.CommandLineMessage>
{
private readonly ILogger<MyService> _logger;
public MyService(ILogger<MyService> logger)
{
_logger = logger;
}
public override void OnConnected(ISocketContext<CommandLineMessage> context)
{
_logger.LogInformation("client connected from {0}", context.RemoteEndPoint);
base.OnConnected(context);
}
public override void OnDisconnected(ISocketContext<CommandLineMessage> context)
{
_logger.LogInformation("client disconnected from {0}", context.RemoteEndPoint);
base.OnDisconnected(context);
}
public override void OnException(ISocketContext<CommandLineMessage> context, Exception ex)
{
_logger.LogError(ex,"client from {0}, occ error {1}", context.RemoteEndPoint,ex.Message);
base.OnException(context, ex);
}
public override void OnReceive(ISocketContext<CommandLineMessage> context, CommandLineMessage msg)
{
string replyMessage = string.Empty;
string replyCmd = string.Empty;
switch (msg.Command)
{
case "echo":
replyMessage = msg.Parameters[0];
replyCmd = "echo";
break;
case "init":
replyMessage = "ok";
replyCmd = "init_reply";
break;
default:
replyMessage = "error unknow command";
break;
}
Task.Run(async () =>
{
await context.SendAsync(new CommandLineMessage(replyCmd, replyMessage));
});
}
}
2. 掛載服務
服務預設掛載在5566埠
static void Main(string[] args)
{
var builder = new HostBuilder()
.ConfigureServices((context,services) =>
{
//協議
services.AddSingleton<IProtocol<CommandLineMessage>, CommandLineProtocol>();
//掛載服務邏輯
services.AddSingleton<ISocketService<CommandLineMessage>, MyService>();
//添加掛載的宿主服務
services.AddTcpServer<CommandLineMessage>();
})
.ConfigureLogging(
logger =>
{
logger.AddConsole();
}
);
builder.RunConsoleAsync().Wait();
}
2. 客戶端
2.1 使用內置的TcpClient
監聽接收消息和鏈接的消息,如下所示:
TcpClient<CommandLineMessage> client = new TcpClient<CommandLineMessage>(new CommandLineProtocol());
client.OnReceived += Client_OnReceived;
client.OnConnected += Client_OnConnected;
Task.Run(async () =>
{
//連接伺服器
var socketContext = await client.ConnectAsync(new IPEndPoint(Hey.IPUtility.GetLocalIntranetIP(), 5566));
//發送消息
var initCmd = new Hey.Messaging.CommandLineMessage("init");
await socketContext.SendAsync(initCmd);
}).Wait();
可用的事件:
OnReceived
當收到服務端消息時OnError
當通訊發生異常時OnConnected
當連接上伺服器時OnDisconnected
當與服務端斷開鏈接時OnIdleState
鏈接閑置時觸發,一般在此事件中發送心跳包
3. 自定義協議
Peach支持使用自定義協議,擴展協議需要自行實現兩個介面:
3.1. IMessage 介面
實現類具體實現通訊消息的內容載體,只需實現如何獲取消息長度的屬性
public interface IMessage
{
int Length { get; }
}
3.2. IProtocol 介面
實現類需要描述消息頭信息和具體打包解包邏輯,頭信息描述參見ProtocolMeta
欄位描述
/// <summary>
/// 協議介面
/// </summary>
/// <typeparam name="TMessage"></typeparam>
public interface IProtocol<TMessage>
where TMessage : Messaging.IMessage
{
ProtocolMeta GetProtocolMeta();
TMessage Parse(Buffer.IBufferReader reader);
void Pack(Buffer.IBufferWriter writer, TMessage message);
}
3.3 Amp協議
為了更好讓讀者理解自定義協議的操作,這裡以DotBPE中的Amp協議為例,來具體講解一下,先來看下Amp協議的說明:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 1415 16171819 20 <length>-21
+------------+----------+---------+------+-------------+---------+---------+--------+------------+
| <ver/argc> | <length> | <seq> |<type>| <serviceId> | <msgId> | <code> | <codec>| <data> |
+------------+----------+---------+------+-------------+---------+---------+--------+------------+
Amp協議固定包頭上21個位元組,說明如下:
- ver/argc = 版本 固定填1
- length = 為總包長
- seq = 請求序列號
- type = 消息類型
- 1 = Request 請求消息
- 2 = Response 響應消息
- 3 = Notify 通知消息
- 4 = OneWayRequest 調用不關心返回值
- serId = serviceId 服務號
- msgId = msgId 消息ID
- code = 當 type = 0 (請求時)固定傳0 ,其他即為響應碼,如果響應碼不為0 則認為請求失敗,具體錯誤碼再定義
- codecType = 編碼方式 0=預設 Protobuf 1=MessagePack 2=JSON
- data = 實際的業務數據
3.3.1 AmpMessage實現
為了避免干擾因素,這裡的代碼去除了一些,輔助行的欄位和方法,AmpMessage其實是主要用於描述頭信息的,並且包含body的buffer數據 Data
欄位,並實現獲取消息體Length的方法(用於發送消息時,計算緩衝區)
public class AmpMessage : Peach.Messaging.IMessage
{
/// <summary>
/// 第一個版本為18個位元組頭固定長度
/// </summary>
public const int VERSION_0_HEAD_LENGTH = 18;
/// <summary>
/// 現有版本21個位元組頭固定長度
/// </summary>
public const int VERSION_1_HEAD_LENGTH = 21;
/// <summary>
/// 狀態碼
/// </summary>
public int Code { get; set; }
//0 預設為Protobuf 1 MessagePack 2 = JSON
public CodecType CodecType { get; set; }
/// <summary>
/// 實際的請求數據
/// </summary>
public byte[] Data { get; set; }
public int Length {
get
{
var hl = Version == 0 ? VERSION_0_HEAD_LENGTH : VERSION_1_HEAD_LENGTH;
if(Data == null)
{
return hl;
}
return hl + this.Data.Length;
}
}
/// <summary>
/// 消息標識
/// </summary>
public string Id => $"{ServiceId}|{MessageId}|{Sequence}";
/// <summary>
/// 調用服務的唯一消息號 確定哪個方法
/// </summary>
public ushort MessageId { get; set; }
/// <summary>
/// 請求的序列號
/// </summary>
public int Sequence { get; set; }
/// <summary>
/// 調用服務的唯一服務號 確定哪個服務
/// </summary>
public int ServiceId { get; set; }
/// <summary>
/// 協議版本0/1
/// </summary>
public byte Version { get; set; }
public InvokeMessageType InvokeMessageType { get; set; }
}
public enum InvokeMessageType : byte
{
Request = 1,
Response = 2,
Notify = 3,
OnewayRequest=4 //請求且不等待回覆
}
3.3.2 AmpProtocol的實現
AmpProtocol中的實現主要是對ProtocolMeta描述,代碼中已有詳細註釋,至於打包和解包,就是根據協議Write或者Read對應的數據類型即可
/// <summary>
/// Amp Protocol
/// </summary>
public class AmpProtocol : IProtocol<AmpMessage>
{
private readonly ISerializer _serializer;
public AmpProtocol(ISerializer serializer)
{
this._serializer = serializer;
}
static readonly ProtocolMeta AMP_PROTOCOL_META = new ProtocolMeta
{
InitialBytesToStrip = 0, //讀取時需要跳過的位元組數
LengthAdjustment = -5, //包實際長度的糾正,如果包長包括包頭和包體,則要減去Length之前的部分
LengthFieldLength = 4, //長度欄位的位元組數 整型為4個位元組
LengthFieldOffset = 1, //長度屬性的起始(偏移)位
MaxFrameLength = int.MaxValue, //最大的數據包位元組數
HeartbeatInterval = 30 * 1000 // 30秒沒消息發一個心跳包
};
public ProtocolMeta GetProtocolMeta()
{
return AMP_PROTOCOL_META;
}
public void Pack(IBufferWriter writer, AmpMessage message)
{
writer.WriteByte(message.Version);
writer.WriteInt(message.Length);
writer.WriteInt(message.Sequence);
writer.WriteByte((byte)message.InvokeMessageType);
if (message.Version == 0)
{
writer.WriteUShort((ushort)message.ServiceId);
}
else
{
writer.WriteInt(message.ServiceId);
}
writer.WriteUShort(message.MessageId);
writer.WriteInt(message.Code);
if(message.Version == 1)
{
writer.WriteByte(_serializer.CodecType);
}
if (message.Data != null)
{
writer.WriteBytes(message.Data);
}
}
public AmpMessage Parse(IBufferReader reader)
{
if (reader.ReadableBytes == 0)
{
return null;
}
var msg = new AmpMessage {Version = reader.ReadByte()};
int headLength;
if (msg.Version == 0 )
{
headLength = AmpMessage.VERSION_0_HEAD_LENGTH;
if (reader.ReadableBytes < AmpMessage.VERSION_0_HEAD_LENGTH - 1)
{
throw new RpcCodecException($"decode error ,ReadableBytes={reader.ReadableBytes+1},HEAD_LENGTH={AmpMessage.VERSION_0_HEAD_LENGTH}");
}
}
else if (msg.Version == 1 )
{
headLength = AmpMessage.VERSION_1_HEAD_LENGTH;
if (reader.ReadableBytes < AmpMessage.VERSION_1_HEAD_LENGTH - 1)
{
throw new RpcCodecException($"decode error ,ReadableBytes={reader.ReadableBytes+1},HEAD_LENGTH={AmpMessage.VERSION_1_HEAD_LENGTH}");
}
}
else
{
throw new RpcCodecException($"decode error ,{msg.Version} is not support");
}
var length = reader.ReadInt();
msg.Sequence = reader.ReadInt();
var type = reader.ReadByte();
msg.InvokeMessageType = (InvokeMessageType)Enum.ToObject(typeof(InvokeMessageType), type);
msg.ServiceId = msg.Version == 0 ? reader.ReadUShort() : reader.ReadInt();
msg.MessageId = reader.ReadUShort();
msg.Code = reader.ReadInt();
if (msg.Version == 1)
{
byte codeType = reader.ReadByte();
if (codeType != this._serializer.CodecType)
{
throw new RpcCodecException($"CodecType:{codeType} is not Match {this._serializer.CodecType}");
}
msg.CodecType = (CodecType)Enum.ToObject(typeof(CodecType), codeType);
}
else
{
msg.CodecType = CodecType.Protobuf;
}
int left = length - headLength;
if (left > 0)
{
if (left > reader.ReadableBytes)
{
throw new RpcCodecException("message not long enough!");
}
msg.Data = new byte[left];
reader.ReadBytes(msg.Data);
}
return msg;
}
}
}
4. 後記
Peach的產生主要是源於對DotBPE的重構,因為在其他項目中有關於通訊的其他需求,所以這塊雖然比較簡單,也可比較獨立,所以單獨開了一個庫來實現對DotNetty的封裝。另外歡迎各位dotnet core的同學一起學習交流 QQ群號:699044833