長鏈接發送request/response時, 絕大部分包都是小包, 而每個小包都要消耗一個IP包, 成本大約是20-30us, 普通千兆網卡的pps大約是60Wpps, 所以想要提高長鏈接密集IO的應用性能, 需要做包的合併, 也稱為了scatter/gather io或者vector io. 在 ...
長鏈接發送request/response時, 絕大部分包都是小包, 而每個小包都要消耗一個IP包, 成本大約是20-30us, 普通千兆網卡的pps大約是60Wpps, 所以想要提高長鏈接密集IO的應用性能, 需要做包的合併, 也稱為了scatter/gather io或者vector io.
在linux下有readv/writev就是對應這個需求的, 減少系統調用, 減少pps, 提高網卡的吞吐量. 關於readv提高讀的速度, 可以看看陳碩muduo裡面對於readv的使用, 思路是就是在棧上面弄一個64KB的數組, 組成readv的第二塊buffer, 從而儘可能一次性把socket緩衝區的內容全部出來(參見5). 這裡不再贅述, 重點描述DotNetty下麵怎麼做Gathering Write.
首先得有一個Channel<IMessage>, 用來做寫的緩衝, 讓業務關心業務, 網路關心網路, 否則每個業務都WriteAndFlushAsync, 那是不太可能有合併發送的.
然後就是SendingLoop的主迴圈, 裡面不停的從Channel裡面TryRead包, 然後WriteAsync, 隔幾個包Flush一次. 類似的思想在Orleans Network裡面也存在.
1 public void RunSendLoopAsync(IChannel channel) 2 { 3 var allocator = channel.Allocator; 4 var reader = this.queue.Reader; 5 Task.Run(async () => 6 { 7 while (!this.stop) 8 { 9 var more = await reader.WaitToReadAsync(); 10 if (!more) 11 { 12 break; 13 } 14 15 IOutboundMessage message = default; 16 var number = 0; 17 try 18 { 19 while (number < 4 && reader.TryRead(out message)) 20 { 21 Interlocked.Decrement(ref this.queueCount); 22 var msg = message.Inner as IMessage; 23 var buffer = msg.ToByteBuffer(allocator); 24 await channel.WriteAsync(buffer); 25 number++; 26 } 27 channel.Flush(); 28 number = 0; 29 } 30 catch (Exception e) when(message != default) 31 { 32 logger.LogError("SendOutboundMessage Fail, SessionID:{0}, Exception:{1}", 33 this.sessionID, e.Message); 34 this.messageCenter.OnMessageFail(message); 35 } 36 } 37 this.logger.LogInformation("SessionID:{0}, SendingLoop Exit", this.sessionID); 38 }); 39 }
第19-27行是關鍵, 這邊每4個包做一下flush, 然後flush會觸發DotNetty的DoWrite:
1 protected override void DoWrite(ChannelOutboundBuffer input) 2 { 3 List<ArraySegment<byte>> sharedBufferList = null; 4 try 5 { 6 while (true) 7 { 8 int size = input.Size; 9 if (size == 0) 10 { 11 // All written 12 break; 13 } 14 long writtenBytes = 0; 15 bool done = false; 16 17 // Ensure the pending writes are made of ByteBufs only. 18 int maxBytesPerGatheringWrite = ((TcpSocketChannelConfig)this.config).GetMaxBytesPerGatheringWrite(); 19 sharedBufferList = input.GetSharedBufferList(1024, maxBytesPerGatheringWrite); 20 int nioBufferCnt = sharedBufferList.Count; 21 long expectedWrittenBytes = input.NioBufferSize; 22 Socket socket = this.Socket; 23 24 List<ArraySegment<byte>> bufferList = sharedBufferList; 25 // Always us nioBuffers() to workaround data-corruption. 26 // See https://github.com/netty/netty/issues/2761 27 switch (nioBufferCnt) 28 { 29 case 0: 30 // We have something else beside ByteBuffers to write so fallback to normal writes. 31 base.DoWrite(input); 32 return; 33 default: 34 for (int i = this.Configuration.WriteSpinCount - 1; i >= 0; i--) 35 { 36 long localWrittenBytes = socket.Send(bufferList, SocketFlags.None, out SocketError errorCode); 37 if (errorCode != SocketError.Success && errorCode != SocketError.WouldBlock) 38 { 39 throw new SocketException((int)errorCode); 40 }
DotNetty TcpSocketChannel類的DoWrite函數, 19行獲取當前ChannelOutboundBuffer的Segment<byte>數組, 然後在36行調用Socket.Send一次性發出去, 這個是Gathering Write的關鍵. 有了這個, 就可以不在業務層用CompositeByteBuffer.
DotNetty Libuv Transport的實現可以看6, 思想是類似的.
實際上Orleans 3.x做的網路優化, 也有類似的思想:
1 private async Task ProcessOutgoing() 2 { 3 await Task.Yield(); 4 5 Exception error = default; 6 PipeWriter output = default; 7 var serializer = this.serviceProvider.GetRequiredService<IMessageSerializer>(); 8 try 9 { 10 output = this.Context.Transport.Output; 11 var reader = this.outgoingMessages.Reader; 12 if (this.Log.IsEnabled(LogLevel.Information)) 13 { 14 this.Log.LogInformation( 15 "Starting to process messages from local endpoint {Local} to remote endpoint {Remote}", 16 this.LocalEndPoint, 17 this.RemoteEndPoint); 18 } 19 20 while (true) 21 { 22 var more = await reader.WaitToReadAsync(); 23 if (!more) 24 { 25 break; 26 } 27 28 Message message = default; 29 try 30 { 31 while (inflight.Count < inflight.Capacity && reader.TryRead(out message) && this.PrepareMessageForSend(message)) 32 { 33 inflight.Add(message); 34 var (headerLength, bodyLength) = serializer.Write(ref output, message); 35 MessagingStatisticsGroup.OnMessageSend(this.MessageSentCounter, message, headerLength + bodyLength, headerLength, this.ConnectionDirection); 36 } 37 } 38 catch (Exception exception) when (message != default) 39 { 40 this.OnMessageSerializationFailure(message, exception); 41 } 42 43 var flushResult = await output.FlushAsync(); 44 if (flushResult.IsCompleted || flushResult.IsCanceled) 45 { 46 break; 47 } 48 49 inflight.Clear(); 50 }
核心在31行, 開始寫, 43行開始flush, 只不過Orleans用的pipelines io, DotNetty是傳統模型.
這樣做, 可以在有限的pps下, 支撐更高的吞吐量.
個人感覺DotNetty更好用一些.
參考:
1. https://github.com/Azure/DotNetty/blob/dev/src/DotNetty.Transport/Channels/Sockets/TcpSocketChannel.cs#L271-L288
2. https://github.com/dotnet/orleans/blob/master/src/Orleans.Core/Networking/Connection.cs#L282-L294
3. https://docs.microsoft.com/zh-cn/windows/win32/winsock/scatter-gather-i-o-2
4. https://linux.die.net/man/2/writev
5. https://github.com/chenshuo/muduo/blob/d980315dc054b122612f423ee2e1316cb14bd3b5/muduo/net/Buffer.cc#L28-L38
6. https://github.com/Azure/DotNetty/blob/dev/src/DotNetty.Transport.Libuv/Native/WriteRequest.cs#L106-L128