項目文件結構圖 1. 消息監聽器(黃色框) 這部分由 Netty 實現,Netty是一個非同步且非阻塞的通信框架。TCP通信實現服務端和客戶端的交互。 Netty 的簡單描述如下: 客戶端(調用方):負責發送要執行的指令。 服務端(接收方):分為主從線程。主線程負責接收指令,將指令存入緩存區中,等待執 ...
項目文件結構圖
1. 消息監聽器(黃色框)
這部分由 Netty 實現,Netty是一個非同步且非阻塞的通信框架。TCP通信實現服務端和客戶端的交互。
Netty 的簡單描述如下:
客戶端(調用方):負責發送要執行的指令。
服務端(接收方):分為主從線程。主線程負責接收指令,將指令存入緩存區中,等待執行完成後再通知客戶端(非阻塞);
從線程,有不止一個線程(非同步),負責從緩存池中取出線程依次執行(按隊列先後順序執行),我們通過程式來決定先執行哪個,比如先解碼,後執行,再編碼。
2. 指令執行器(紅色框)
上圖中只有服務端的實現,服務端接收到指令後會回調執行器中的過程。
客戶端的時候需要在業務場景中來實現,根據業務不同,接收到服務端消息後執行的過程也不同。然後通過控制反轉,由程式自動找到該過程。
3. 消息發送器(藍色框)
將要發送的消息寫入到通信管道中,服務端和客戶端都有實現。
4. 客戶端工廠(綠色框)
為啥沒有服務端工廠呢,因為服務端是由服務端主機(Host)直接創建的,主機直接調用監聽器監聽埠。既然我們重寫了通信過程,就不能用微軟原有的WebHost,後續會講到如何搭建自己的主機。
客戶端工廠用來創建客戶端,然後與服務端主機通信。
5. 序列化工具(白色框)
上節中我們用MessagePack實現了序列化與反序列化,本節為通信,自然離不開消息序列化。
1. 消息監聽器(黃色框)
上層介面:
/// <summary> /// 接受到消息的委托。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">接收到的消息。</param> public delegate Task ReceivedDelegate(IMessageSender sender, TransportMessage message); /// <summary> /// 一個抽象的消息監聽者。 /// </summary> public interface IMessageListener { /// <summary> /// 接收到消息的事件。 /// </summary> event ReceivedDelegate Received; /// <summary> /// 觸發接收到消息事件。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一個任務。</returns> Task OnReceived(IMessageSender sender, TransportMessage message); }
客戶端:
/// <summary> /// 消息監聽者。 /// </summary> public class DotNettyClientMessageListener : IMessageListener { #region Implementation of IMessageListener /// <summary> /// 接收到消息的事件。 /// </summary> public event ReceivedDelegate Received; /// <summary> /// 觸發接收到消息事件。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一個任務。</returns> public async Task OnReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; await Received(sender, message); } #endregion Implementation of IMessageListener }
服務端:
public class DotNettyServerMessageListener : IMessageListener, IDisposable { #region Field private readonly ILogger<DotNettyServerMessageListener> _logger; private readonly ITransportMessageDecoder _transportMessageDecoder; private readonly ITransportMessageEncoder _transportMessageEncoder; private IChannel _channel; #endregion Field #region Constructor public DotNettyServerMessageListener(ILogger<DotNettyServerMessageListener> logger, ITransportMessageCodecFactory codecFactory) { _logger = logger; _transportMessageEncoder = codecFactory.GetEncoder(); _transportMessageDecoder = codecFactory.GetDecoder(); } #endregion Constructor #region Implementation of IMessageListener public event ReceivedDelegate Received; /// <summary> /// 觸發接收到消息事件。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一個任務。</returns> public async Task OnReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; await Received(sender, message); } #endregion Implementation of IMessageListener public async Task StartAsync(EndPoint endPoint) { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"準備啟動服務主機,監聽地址:{endPoint}。"); IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1); IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();//Default eventLoopCount is Environment.ProcessorCount * 2 var bootstrap = new ServerBootstrap(); bootstrap .Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 128) .ChildOption(ChannelOption.Allocator, PooledByteBufferAllocator.Default) .Group(bossGroup, workerGroup) .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { var pipeline = channel.Pipeline; pipeline.AddLast(new LengthFieldPrepender(4)); pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder)); pipeline.AddLast(new ServerHandler(async (contenxt, message) => { var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt); await OnReceived(sender, message); }, _logger)); })); try { _channel = await bootstrap.BindAsync(endPoint); if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"服務主機啟動成功,監聽地址:{endPoint}。"); } catch { _logger.LogError($"服務主機啟動失敗,監聽地址:{endPoint}。 "); } } public void CloseAsync() { Task.Run(async () => { await _channel.EventLoop.ShutdownGracefullyAsync(); await _channel.CloseAsync(); }).Wait(); } #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { Task.Run(async () => { await _channel.DisconnectAsync(); }).Wait(); } #endregion Implementation of IDisposable #region Help Class private class ServerHandler : ChannelHandlerAdapter { private readonly Action<IChannelHandlerContext, TransportMessage> _readAction; private readonly ILogger _logger; public ServerHandler(Action<IChannelHandlerContext, TransportMessage> readAction, ILogger logger) { _readAction = readAction; _logger = logger; } #region Overrides of ChannelHandlerAdapter public override void ChannelRead(IChannelHandlerContext context, object message) { Task.Run(() => { var transportMessage = (TransportMessage)message; _readAction(context, transportMessage); }); } public override void ChannelReadComplete(IChannelHandlerContext context) { context.Flush(); } public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { context.CloseAsync();//客戶端主動斷開需要應答,否則socket變成CLOSE_WAIT狀態導致socket資源耗盡 if (_logger.IsEnabled(LogLevel.Error)) _logger.LogError(null,exception, $"與伺服器:{context.Channel.RemoteAddress}通信時發送了錯誤。"); } #endregion Overrides of ChannelHandlerAdapter } #endregion Help Class }
2. 指令執行器(紅色框)
上層介面:
/// <summary> /// 一個抽象的服務執行器。 /// </summary> public interface IServiceExecutor { /// <summary> /// 執行。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">調用消息。</param> Task ExecuteAsync(IMessageSender sender, TransportMessage message); }
實現類,代碼中沒有處理邏輯(服務發現、執行,後續有專題來談),只有輸出列印 "服務提供者接收到消息。" :
public class HttpServiceExecutor : IServiceExecutor { #region Field private readonly ILogger<HttpServiceExecutor> _logger; #endregion Field #region Constructor public HttpServiceExecutor(ILogger<HttpServiceExecutor> logger) { _logger = logger; } #endregion Constructor #region Implementation of IServiceExecutor /// <summary> /// 執行。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">調用消息。</param> public async Task ExecuteAsync(IMessageSender sender, TransportMessage message) { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("服務提供者接收到消息。"); return; } #endregion Implementation of IServiceExecutor }
3. 消息發送器(藍色框)
上層介面:
/// <summary> /// 一個抽象的發送者。 /// </summary> public interface IMessageSender { /// <summary> /// 發送消息。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> Task SendAsync(TransportMessage message); /// <summary> /// 發送消息並清空緩衝區。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> Task SendAndFlushAsync(TransportMessage message); }
實現類基類:
/// <summary> /// 基於DotNetty的消息發送者基類。 /// </summary> public abstract class DotNettyMessageSender { private readonly ITransportMessageEncoder _transportMessageEncoder; protected DotNettyMessageSender(ITransportMessageEncoder transportMessageEncoder) { _transportMessageEncoder = transportMessageEncoder; } protected IByteBuffer GetByteBuffer(TransportMessage message) { var data = _transportMessageEncoder.Encode(message); //var buffer = PooledByteBufferAllocator.Default.Buffer(); return Unpooled.WrappedBuffer(data); } }
服務端:
/// <summary> /// 基於DotNetty服務端的消息發送者。 /// </summary> public class DotNettyServerMessageSender : DotNettyMessageSender, IMessageSender { private readonly IChannelHandlerContext _context; public DotNettyServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IChannelHandlerContext context) : base(transportMessageEncoder) { _context = context; } #region Implementation of IMessageSender /// <summary> /// 發送消息。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> public async Task SendAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _context.WriteAsync(buffer); } /// <summary> /// 發送消息並清空緩衝區。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> public async Task SendAndFlushAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _context.WriteAndFlushAsync(buffer); } #endregion Implementation of IMessageSender }
客戶端:
/// <summary> /// 基於DotNetty客戶端的消息發送者。 /// </summary> public class DotNettyMessageClientSender : DotNettyMessageSender, IMessageSender, IDisposable { private readonly IChannel _channel; public DotNettyMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IChannel channel) : base(transportMessageEncoder) { _channel = channel; } #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { Task.Run(async () => { await _channel.DisconnectAsync(); }).Wait(); } #endregion Implementation of IDisposable #region Implementation of IMessageSender /// <summary> /// 發送消息。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> public async Task SendAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _channel.WriteAndFlushAsync(buffer); } /// <summary> /// 發送消息並清空緩衝區。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> public async Task SendAndFlushAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _channel.WriteAndFlushAsync(buffer); } #endregion Implementation of IMessageSender }
4. 客戶端工廠(綠色框)
上層介面:
/// <summary> /// 一個抽象的傳輸客戶端工廠。 /// </summary> public interface ITransportClientFactory { /// <summary> /// 創建客戶端。 /// </summary> /// <param name="endPoint">終結點。</param> /// <returns>傳輸客戶端實例。</returns> Task<ITransportClient> CreateClientAsync(EndPoint endPoint); }
/// <summary> /// 一個抽象的傳輸客戶端。 /// </summary> public interface ITransportClient { /// <summary> /// 發送消息。 /// </summary> /// <param name="message">遠程調用消息模型。</param> /// <returns>遠程調用消息的傳輸消息。</returns> Task SendAsync(TransportMessage transportMessage); }
實現類:
/// <summary> /// 基於DotNetty的傳輸客戶端工廠。 /// </summary> public class DotNettyTransportClientFactory : ITransportClientFactory, IDisposable { #region Field private readonly ITransportMessageEncoder _transportMessageEncoder; private readonly ITransportMessageDecoder _transportMessageDecoder; private readonly ILogger<DotNettyTransportClientFactory> _logger; private readonly IServiceExecutor _serviceExecutor; private readonly ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>> _clients = new ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>>(); private readonly Bootstrap _bootstrap; private static readonly AttributeKey<IMessageSender> messageSenderKey = AttributeKey<IMessageSender>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(IMessageSender)); private static readonly AttributeKey<IMessageListener> messageListenerKey = AttributeKey<IMessageListener>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(IMessageListener)); private static readonly AttributeKey<EndPoint> origEndPointKey = AttributeKey<EndPoint>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(EndPoint)); #endregion Field #region Constructor public DotNettyTransportClientFactory(ITransportMessageCodecFactory codecFactory, ILogger<DotNettyTransportClientFactory> logger) : this(codecFactory, logger, null) { } public DotNettyTransportClientFactory(ITransportMessageCodecFactory codecFactory, ILogger<DotNettyTransportClientFactory> logger, IServiceExecutor serviceExecutor) { _transportMessageEncoder = codecFactory.GetEncoder(); _transportMessageDecoder = codecFactory.GetDecoder(); _logger = logger; _serviceExecutor = serviceExecutor; _bootstrap = GetBootstrap(); _bootstrap.Handler(new ActionChannelInitializer<ISocketChannel>(c => { var pipeline = c.Pipeline; pipeline.AddLast(new LengthFieldPrepender(4)); pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder)); pipeline.AddLast(new DefaultChannelHandler(this)); })); } #endregion Constructor #region Implementation of ITransportClientFactory /// <summary> /// 創建客戶端。 /// </summary> /// <param name="endPoint">終結點。</param> /// <returns>傳輸客戶端實例。</returns> public async Task<ITransportClient> CreateClientAsync(EndPoint endPoint) { var key = endPoint; if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"準備為服務端地址:{key}創建客戶端。"); try { return await _clients.GetOrAdd(key , k => new Lazy<Task<ITransportClient>>(async () => { //客戶端對象 var bootstrap = _bootstrap; //非同步連接返回channel var channel = await bootstrap.ConnectAsync(k); var messageListener = new DotNettyClientMessageListener(); //設置監聽 channel.GetAttribute(messageListenerKey).Set(messageListener); //實例化發送者 var messageSender = new DotNettyMessageClientSender(_transportMessageEncoder, channel); //設置channel屬性 channel.GetAttribute(messageSenderKey).Set(messageSender); channel.GetAttribute(origEndPointKey).Set(k); //創建客戶端 var client = new DotNettyTransportClient(messageSender, messageListener, _logger, _serviceExecutor); return client; } )).Value;//返回實例 } catch { throw; } } #endregion Implementation of ITransportClientFactory #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { foreach (var client in _clients.Values.Where(i => i.IsValueCreated)) { (client.Value as IDisposable)?.Dispose(); } } #endregion Implementation of IDisposable private static Bootstrap GetBootstrap() { IEventLoopGroup group; var bootstrap = new Bootstrap(); group = new MultithreadEventLoopGroup(); bootstrap.Channel<TcpServerSocketChannel>(); bootstrap .Channel<TcpSocketChannel>() .Option(ChannelOption.TcpNodelay, true) .Option(ChannelOption.Allocator, PooledByteBufferAllocator.Default) .Group(group); return bootstrap; } protected class DefaultChannelHandler : ChannelHandlerAdapter { private readonly DotNettyTransportClientFactory _factory; public DefaultChannelHandler(DotNettyTransportClientFactory factory) { this._factory = factory; } #region Overrides of ChannelHandlerAdapter public override void ChannelInactive(IChannelHandlerContext context) { _factory._clients.TryRemove(context.Channel.GetAttribute(origEndPointKey).Get(), out var value); } public override void ChannelRead(IChannelHandlerContext context, object message) { var transportMessage = message as TransportMessage; var messageListener = context.Channel.GetAttribute(messageListenerKey).Get(); var messageSender = context.Channel.GetAttribute(messageSenderKey).Get(); messageListener.OnReceived(messageSender, transportMessage); } #endregion Overrides of ChannelHandlerAdapter } }
/// <summary> /// 一個預設的傳輸客戶端實現。 /// </summary> public class DotNettyTransportClient : ITransportClient, IDisposable { #region Field private readonly IMessageSender _messageSender; private readonly IMessageListener _messageListener; private readonly ILogger _logger; private readonly IServiceExecutor _serviceExecutor; #endregion Field #region Constructor public DotNettyTransportClient(IMessageSender messageSender, IMessageListener messageListener, ILogger logger, IServiceExecutor serviceExecutor) { _messageSender = messageSender; _messageListener = messageListener; _logger = logger; _serviceExecutor = serviceExecutor; messageListener.Received += MessageListener_Received; } #endregion Constructor #region Implementation of ITransportClient /// <summary> /// 發送消息。 /// </summary> /// <param name="message">遠程調用消息模型。</param> /// <returns>遠程調用消息的傳輸消息。</returns> public async Task SendAsync(TransportMessage transportMessage) { try { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("準備發送消息。"); try { //發送 await _messageSender.SendAndFlushAsync(transportMessage); } catch (Exception exception) { throw new Exception("與服務端通訊時發生了異常。", exception); } if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("消息發送成功。"); } catch (Exception exception) { if (_logger.IsEnabled(LogLevel.Error)) _logger.LogError(null,exception, "消息發送失敗。"); throw; } } #endregion Implementation of ITransportClient #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { (_messageSender as IDisposable)?.Dispose(); (_messageListener as IDisposable)?.Dispose(); } #endregion Implementation of IDisposable #region Private Method private async Task MessageListener_Received(IMessageSender sender, TransportMessage message) { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("服務消費者接收到消息。"); if (_serviceExecutor != null) await _serviceExecutor.ExecuteAsync(sender, message); } #endregion Private Method }
5. 序列化工具(白色框)
需要繼承自 DotNetty.Transport.Channels.ChannelHandlerAdapter,才能被 netty 調用:
public class TransportMessageChannelHandlerAdapter : ChannelHandlerAdapter { private readonly ITransportMessageDecoder _transportMessageDecoder; public TransportMessageChannelHandlerAdapter(ITransportMessageDecoder transportMessageDecoder) { _transportMessageDecoder = transportMessageDecoder; } #region Overrides of ChannelHandlerAdapter public override void ChannelRead(IChannelHandlerContext context, object message) { var buffer = (IByteBuffer)message; var data = new byte[buffer.ReadableBytes]; buffer.ReadBytes(data); var transportMessage = _transportMessageDecoder.Decode(data); context.FireChannelRead(transportMessage); ReferenceCountUtil.Release(buffer); } #endregion Overrides of ChannelHandlerAdapter }