一、DotNetty背景介紹 某天發現 dotnet 是個好東西,就找了個項目來練練手。於是有了本文的 Mqtt 客戶端 (github: MqttFx ) DotNetty是微軟的Azure團隊,使用C#實現的Netty的版本發佈。不但使用了C#和.Net平臺的技術特點,並且保留了Netty原來絕 ...
一、DotNetty背景介紹
某天發現 dotnet 是個好東西,就找了個項目來練練手。於是有了本文的 Mqtt 客戶端 (github: MqttFx )
DotNetty是微軟的Azure團隊,使用C#實現的Netty的版本發佈。不但使用了C#和.Net平臺的技術特點,並且保留了Netty原來絕大部分的編程介面。讓我們在使用時,完全可以依照Netty官方的教程來學習和使用DotNetty應用程式。
DotNetty同時也是開源的,它的源代碼托管在Github上: https://github.com/azure/dotnetty
Netty 的官方文檔 : http://netty.io/wiki/all-documents.html
二、Packet
套件里是有個 DotNetty.Codecs.Mqtt, 本項目沒有使用。直接寫了一個。
FixedHeader: 固定報頭
/// <summary> /// 固定報頭 /// </summary> public class FixedHeader { /// <summary> /// 報文類型 /// </summary> public PacketType PacketType { get; set; } /// <summary> /// 重發標誌 /// </summary> public bool Dup { get; set; } /// <summary> /// 服務質量等級 /// </summary> public MqttQos Qos { get; set; } /// <summary> /// 保留標誌 /// </summary> public bool Retain { get; set; } /// <summary> /// 剩餘長度 /// </summary> public int RemaingLength { internal get; set; } public FixedHeader(PacketType packetType) { PacketType = packetType; } public FixedHeader(byte signature, int remainingLength) { PacketType = (PacketType)((signature & 0xf0) >> 4); Dup = ((signature & 0x08) >> 3) > 0; Qos = (MqttQos)((signature & 0x06) >> 1); Retain = (signature & 0x01) > 0; RemaingLength = remainingLength; } public void WriteTo(IByteBuffer buffer) { var flags = (byte)PacketType << 4; flags |= Dup.ToByte() << 3; flags |= (byte)Qos << 1; flags |= Retain.ToByte(); buffer.WriteByte((byte)flags); buffer.WriteBytes(EncodeLength(RemaingLength)); } static byte[] EncodeLength(int length) { var result = new List<byte>(); do { var digit = (byte)(length % 0x80); length /= 0x80; if (length > 0) digit |= 0x80; result.Add(digit); } while (length > 0); return result.ToArray(); } }
Packet: 消息基類
/// <summary> /// 消息基類 /// </summary> public abstract class Packet { #region FixedHeader /// <summary> /// 固定報頭 /// </summary> public FixedHeader FixedHeader { protected get; set; } /// <summary> /// 報文類型 /// </summary> public PacketType PacketType => FixedHeader.PacketType; /// <summary> /// 重發標誌 /// </summary> public bool Dup => FixedHeader.Dup; /// <summary> /// 服務質量等級 /// </summary> public MqttQos Qos => FixedHeader.Qos; /// <summary> /// 保留標誌 /// </summary> public bool Retain => FixedHeader.Retain; /// <summary> /// 剩餘長度 /// </summary> public int RemaingLength => FixedHeader.RemaingLength; #endregion public Packet(PacketType packetType) => FixedHeader = new FixedHeader(packetType); public virtual void Encode(IByteBuffer buffer) { } public virtual void Decode(IByteBuffer buffer) { } }
PacketWithId: 消息基類(帶ID)
/// <summary> /// 消息基類(帶ID) /// </summary> public abstract class PacketWithId : Packet { public PacketWithId(PacketType packetType) : base(packetType) { } /// <summary> /// 報文標識符 /// </summary> public ushort PacketId { get; set; } /// <summary> /// EncodePacketIdVariableHeader /// </summary> /// <param name="buffer"></param> public override void Encode(IByteBuffer buffer) { var buf = Unpooled.Buffer(); try { EncodePacketId(buf); FixedHeader.RemaingLength = buf.ReadableBytes; FixedHeader.WriteTo(buffer); buffer.WriteBytes(buf); buf = null; } finally { buf?.Release(); } } /// <summary> /// DecodePacketIdVariableHeader /// </summary> /// <param name="buffer"></param> public override void Decode(IByteBuffer buffer) { int remainingLength = RemaingLength; DecodePacketId(buffer, ref remainingLength); FixedHeader.RemaingLength = remainingLength; } protected void EncodePacketId(IByteBuffer buffer) { if (Qos > MqttQos.AtMostOnce) { buffer.WriteUnsignedShort(PacketId); } } protected void DecodePacketId(IByteBuffer buffer, ref int remainingLength) { if (Qos > MqttQos.AtMostOnce) { PacketId = buffer.ReadUnsignedShort(ref remainingLength); if (PacketId == 0) throw new DecoderException("[MQTT-2.3.1-1]"); } } }
ConnectPacket: 發起連接包
/// <summary> /// 發起連接 /// </summary> internal sealed class ConnectPacket : Packet { public ConnectPacket() : base(PacketType.CONNECT) { } #region Variable header /// <summary> /// 協議名 /// </summary> public string ProtocolName { get; } = "MQTT"; /// <summary> /// 協議級別 /// </summary> public byte ProtocolLevel { get; } = 0x04; /// <summary> /// 保持連接 /// </summary> public short KeepAlive { get; set; } #region Connect Flags /// <summary> /// 用戶名標誌 /// </summary> public bool UsernameFlag { get; set; } /// <summary> /// 密碼標誌 /// </summary> public bool PasswordFlag { get; set; } /// <summary> /// 遺囑保留 /// </summary> public bool WillRetain { get; set; } /// <summary> /// 遺囑QoS /// </summary> public MqttQos WillQos { get; set; } /// <summary> /// 遺囑標誌 /// </summary> public bool WillFlag { get; set; } /// <summary> /// 清理會話 /// </summary> public bool CleanSession { get; set; } #endregion #endregion #region Payload /// <summary> /// 客戶端標識符 Client Identifier /// </summary> public string ClientId { get; set; } /// <summary> /// 遺囑主題 Will Topic /// </summary> public string WillTopic { get; set; } /// <summary> /// 遺囑消息 Will Message /// </summary> public byte[] WillMessage { get; set; } /// <summary> /// 用戶名 User Name /// </summary> public string UserName { get; set; } /// <summary> /// 密碼 Password /// </summary> public string Password { get; set; } #endregion public override void Encode(IByteBuffer buffer) { var buf = Unpooled.Buffer(); try { //variable header buf.WriteString(ProtocolName); //byte 1 - 8 buf.WriteByte(ProtocolLevel); //byte 9 //connect flags; //byte 10 var flags = UsernameFlag.ToByte() << 7; flags |= PasswordFlag.ToByte() << 6; flags |= WillRetain.ToByte() << 5; flags |= ((byte)WillQos) << 3; flags |= WillFlag.ToByte() << 2; flags |= CleanSession.ToByte() << 1; buf.WriteByte((byte)flags); //keep alive buf.WriteShort(KeepAlive); //byte 11 - 12 //payload buf.WriteString(ClientId); if (WillFlag) { buf.WriteString(WillTopic); buf.WriteBytes(WillMessage); } if (UsernameFlag && PasswordFlag) { buf.WriteString(UserName); buf.WriteString(Password); } FixedHeader.RemaingLength = buf.ReadableBytes; FixedHeader.WriteTo(buffer); buffer.WriteBytes(buf); } finally { buf?.Release(); buf = null; } } }
連接回執: ConnAckPacket
/// <summary> /// 連接回執 /// </summary> internal sealed class ConnAckPacket : Packet { public ConnAckPacket() : base (PacketType.CONNACK) { } /// <summary> /// 當前會話 /// </summary> public bool SessionPresent { get; set; } /// <summary> /// 連接返回碼 /// </summary> public ConnectReturnCode ConnectReturnCode { get; set; } public override void Decode(IByteBuffer buffer) { SessionPresent = (buffer.ReadByte() & 0x01) == 1; ConnectReturnCode = (ConnectReturnCode)buffer.ReadByte(); } }View Code
剩餘幾個包,,大家看看源碼。
三、包解碼編碼 MqttDecoder MqttEncoder
粘包拆包問題是處於網路比較底層的問題,在數據鏈路層、網路層以及傳輸層都有可能發生。我們日常的網路應用開發大都在傳輸層進行,由於UDP有消息保護邊界,不會發生這個問題。
什麼是粘包、拆包?
對於什麼是粘包、拆包問題,我想先舉兩個簡單的應用場景:
-
客戶端和伺服器建立一個連接,客戶端發送一條消息,客戶端關閉與服務端的連接。
-
客戶端和伺服器簡歷一個連接,客戶端連續發送兩條消息,客戶端關閉與服務端的連接。
對於第一種情況,服務端的處理流程可以是這樣的:當客戶端與服務端的連接建立成功之後,服務端不斷讀取客戶端發送過來的數據,當客戶端與服務端連接斷開之後,服務端知道已經讀完了一條消息,然後進行解碼和後續處理...。對於第二種情況,如果按照上面相同的處理邏輯來處理,那就有問題了,我們來看看第二種情況下客戶端發送的兩條消息遞交到服務端有可能出現的情況:
第一種情況:
服務端一共讀到兩個數據包,第一個包包含客戶端發出的第一條消息的完整信息,第二個包包含客戶端發出的第二條消息,那這種情況比較好處理,伺服器只需要簡單的從網路緩衝區去讀就好了,第一次讀到第一條消息的完整信息,消費完再從網路緩衝區將第二條完整消息讀出來消費。
沒有發生粘包、拆包示意圖
第二種情況:
服務端一共就讀到一個數據包,這個數據包包含客戶端發出的兩條消息的完整信息,這個時候基於之前邏輯實現的服務端就蒙了,因為服務端不知道第一條消息從哪兒結束和第二條消息從哪兒開始,這種情況其實是發生了TCP粘包。
TCP粘包示意圖
第三種情況:
服務端一共收到了兩個數據包,第一個數據包只包含了第一條消息的一部分,第一條消息的後半部分和第二條消息都在第二個數據包中,或者是第一個數據包包含了第一條消息的完整信息和第二條消息的一部分信息,第二個數據包包含了第二條消息的剩下部分,這種情況其實是發送了TCP拆,因為發生了一條消息被拆分在兩個包裡面發送了,同樣上面的伺服器邏輯對於這種情況是不好處理的。
TCP拆包示意圖
為什麼會發生TCP粘包、拆包呢?
發生TCP粘包、拆包主要是由於下麵一些原因:
-
應用程式寫入的數據大於套接字緩衝區大小,這將會發生拆包。
-
應用程式寫入數據小於套接字緩衝區大小,網卡將應用多次寫入的數據發送到網路上,這將會發生粘包。
-
進行MSS(最大報文長度)大小的TCP分段,當TCP報文長度-TCP頭部長度>MSS的時候將發生拆包。
-
接收方法不及時讀取套接字緩衝區數據,這將發生粘包。
-
……
如何處理粘包、拆包問題?
知道了粘包、拆包問題及根源,那麼如何處理粘包、拆包問題呢?TCP本身是面向流的,作為網路伺服器,如何從這源源不斷涌來的數據流中拆分出或者合併出有意義的信息呢?通常會有以下一些常用的方法:
-
使用帶消息頭的協議、消息頭存儲消息開始標識及消息長度信息,服務端獲取消息頭的時候解析出消息長度,然後向後讀取該長度的內容。
-
設置定長消息,服務端每次讀取既定長度的內容作為一條完整消息。
-
設置消息邊界,服務端從網路流中按消息編輯分離出消息內容。
-
……
如何基於DotNetty處理粘包、拆包問題?
ChannelPipeline 網路層數據的流向
ChannelHandler 組件對網路數據的處理
-
ByteToMessageDecoder
-
MessageToMessageDecoder
這兩個組件都實現了ChannelInboundHandler介面,這說明這兩個組件都是用來解碼網路上過來的數據的。而他們的順序一般是ByteToMessageDecoder位於head channel handler的後面,MessageToMessageDecoder位於ByteToMessageDecoder的後面。DotNetty中,涉及到粘包、拆包的邏輯主要在ByteToMessageDecoder及其實現中。
ByteToMessageDecoder
顧名思義、ByteToMessageDecoder是用來將從網路緩衝區讀取的位元組轉換成有意義的消息對象的
當上面一個channel handler傳入的ByteBuf有數據的時候,這裡我們可以把in參數看成網路流,這裡有不斷的數據流入,而我們要做的就是從這個byte流中分離出message,然後把message添加給out。分開將一下代碼邏輯:
-
當out中有Message的時候,直接將out中的內容交給後面的channel handler去處理。
-
當用戶邏輯把當前channel handler移除的時候,立即停止對網路數據的處理。
-
記錄當前in中可讀位元組數。
-
decode是抽象方法,交給子類具體實現。
-
同樣判斷當前channel handler移除的時候,立即停止對網路數據的處理。
-
如果子類實現沒有分理出任何message的時候,且子類實現也沒有動bytebuf中的數據的時候,這裡直接跳出,等待後續有數據來了再進行處理。
-
如果子類實現沒有分理出任何message的時候,且子類實現動了bytebuf中的數據,則繼續迴圈,直到解析出message或者不在對bytebuf中數據進行處理為止。
-
如果子類實現解析出了message但是又沒有動bytebuf中的數據,那麼是有問題的,拋出異常。
-
如果標誌位只解碼一次,則退出。
可以知道,如果要實現具有處理粘包、拆包功能的子類,及decode實現,必須要遵守上面的規則,我們以實現處理第一部分的第二種粘包情況和第三種情況拆包情況的伺服器邏輯來舉例:
對於粘包情況的decode需要實現的邏輯對應於將客戶端發送的兩條消息都解析出來分為兩個message加入out,這樣的話callDecode只需要調用一次decode即可。
對於拆包情況的decode需要實現的邏輯主要對應於處理第一個數據包的時候第一次調用decode的時候out的size不變,從continue跳出並且由於不滿足繼續可讀而退出迴圈,處理第二個數據包的時候,對於decode的調用將會產生兩個message放入out,其中兩次進入callDecode上下文中的數據流將會合併為一個bytebuf和當前channel handler實例關聯,兩次處理完畢即清空這個bytebuf。
MqttDecoder : Mqtt 解碼器
public sealed class MqttDecoder : ByteToMessageDecoder { readonly bool _isServer; readonly int _maxMessageSize; public MqttDecoder(bool isServer, int maxMessageSize) { _isServer = isServer; _maxMessageSize = maxMessageSize; } protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) { try { if (!TryDecodePacket(context, input, out Packet packet)) return; output.Add(packet); } catch (DecoderException) { input.SkipBytes(input.ReadableBytes); throw; } } bool TryDecodePacket(IChannelHandlerContext context, IByteBuffer buffer, out Packet packet) { if (!buffer.IsReadable(2)) { packet = null; return false; } byte signature = buffer.ReadByte(); if (!TryDecodeRemainingLength(buffer, out int remainingLength) || !buffer.IsReadable(remainingLength)) { packet = null; return false; } //DecodePacketInternal var fixedHeader = new FixedHeader(signature, remainingLength); switch (fixedHeader.PacketType) { case PacketType.CONNECT: packet = new ConnectPacket(); break; case PacketType.CONNACK: packet = new ConnAckPacket(); break; case PacketType.DISCONNECT: packet = new DisconnectPacket(); break; case PacketType.PINGREQ: packet = new PingReqPacket(); break; case PacketType.PINGRESP: packet = new PingRespPacket(); break; case PacketType.PUBACK: packet = new PubAckPacket(); break; case PacketType.PUBCOMP: packet = new PubCompPacket(); break; case PacketType.PUBLISH: packet = new PublishPacket(); break; case PacketType.PUBREC: packet = new PubRecPacket(); break; case PacketType.PUBREL: packet = new PubRelPacket(); break; case PacketType.SUBSCRIBE: packet = new SubscribePacket(); break; case PacketType.SUBACK: packet = new SubAckPacket(); break; case PacketType.UNSUBSCRIBE: packet = new UnsubscribePacket(); break; case PacketType.UNSUBACK: packet = new UnsubscribePacket(); break; default: throw new DecoderException("Unsupported Message Type"); } packet.FixedHeader = fixedHeader; packet.Decode(buffer); //if (remainingLength > 0) // throw new DecoderException($"Declared remaining length is bigger than packet data size by {remainingLength}."); return true; } bool TryDecodeRemainingLength(IByteBuffer buffer, out int value) { int readable = buffer.ReadableBytes; int result = 0; int multiplier = 1; byte digit; int read = 0; do { if (readable < read + 1) { value = default; return false; } digit = buffer.ReadByte(); result += (digit & 0x7f) * multiplier; multiplier <<= 7; read++; } while ((digit & 0x80) != 0 && read < 4); if (read == 4 && (digit & 0x80) != 0) throw new DecoderException("Remaining length exceeds 4 bytes in length"); int completeMessageSize = result + 1 + read; if (completeMessageSize > _maxMessageSize) throw new DecoderException("Message is too big: " + completeMessageSize); value = result; return true; } //static int DecodeRemainingLength(IByteBuffer buffer) //{ // byte encodedByte; // var multiplier = 1; // var remainingLength = 0; // do // { // encodedByte = buffer.ReadByte(); // remainingLength += (encodedByte & 0x7f) * multiplier; // multiplier *= 0x80; // } while ((encodedByte & 0x80) != 0); // return remainingLength; //} }
MqttEncoder: mqtt 編碼器
public sealed class MqttEncoder : MessageToMessageEncoder<Packet> { public static readonly MqttEncoder Instance = new MqttEncoder(); protected override void Encode(IChannelHandlerContext context, Packet message, List<object> output) => DoEncode(context.Allocator, message, output); public static void DoEncode(IByteBufferAllocator bufferAllocator, Packet packet, List<object> output) { IByteBuffer buffer = bufferAllocator.Buffer(); try { packet.Encode(buffer); output.Add(buffer); buffer = null; } finally { buffer?.SafeRelease(); } } }
未完待續。。。