非同步socket不是一個新的概念,但是,對於普通開發人員來說很難掌握。 網上也有許多這方面的文章。大都比較零散。 因工作的需要,我編寫了此封裝庫。 本人認為較好的完成對非同步socket的封裝! 即不失性能,又非常易於使用。 此庫將socket處理分為幾個模塊,清晰明瞭。 有些模塊處理可能沒達到最佳狀... ...
前言
socket是軟體之間通訊最常用的一種方式。c#實現socket通訊有很多中方法,其中效率最高就是非同步通訊。
非同步通訊實際是利用windows完成埠(IOCP)來處理的,關於完成埠實現原理,大家可以參考網上文章。
我這裡想強調的是採用完成埠機制的非同步通訊是windows下效率最高的通訊方式,沒有之一!
非同步通訊比同步通訊處理要難很多,代碼編寫中會遇到許多“坑“。如果沒有經驗,很難完成。
我搜集了大量資料,完成了對非同步socket的封裝。此庫已用穩定高效的運行幾個月。
縱觀網上的資料,我還沒有遇到一個滿意的封裝庫。許多文章把數據收發和協議處理雜糅在一塊,代碼非常難懂,也無法擴展。
在編寫該庫時,避免以上缺陷。將邏輯處理層次化,模塊化!同時實現了高可用性與高性能。
為了使大家對通訊效率有初步瞭解,先看測試圖。
主機配置情況
百兆帶寬基本占滿,cpu占用40%,我的電腦在空閑時,cpu占用大概20%,也就是說程式占用cpu 20%左右。
這個庫是可擴展的,就是說即使10萬個連接,收發同樣的數據,cpu占用基本相同。
庫的結構圖
目標
- 即可作為服務端(監聽)也可以作為客戶端(主動連接)使用。
- 可以適應任何網路協議。收發的數據針對位元組流或一個完整的包。對協議內容不做處理。
- 高可用性。將複雜的底層處理封裝,對外介面非常友好。
- 高性能。最大限度優化處理。單機可支持數萬連接,收發速度可達幾百兆bit。
實現思路
網路處理邏輯可以分為以下幾個部分:
- 網路監聽 可以在多個埠實現監聽。負責生成socket,生成的socket供後續處理。監聽模塊功能比較單一,如有必要,可對監聽模塊做進一步優化。
- 主動連接 可以非同步或同步的連接對方。連接成功後,對socket的後續處理,與監聽得到的socket完全一樣。註:無論是監聽得到的socket,還是連接得到的socket,後續處理完全一樣。
- Socket收發處理 每個socket對應一個收發實例,socket收發只針對位元組流處理。收發時,做了優化。比如發送時,對數據做了沾包,提高發送性能;接收時,一次投遞1K的數據。
- 組包處理 一般數據包都有包長度指示;比如 報頭的前倆個位元組表示長度,根據這個值就可以組成一個完整的包。
NetListener 監聽
using System; using System.Net; using System.Net.Sockets; using System.Threading; namespace IocpCore { class NetListener { private Socket listenSocket; public ListenParam _listenParam { get; set; } public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket; bool start; NetServer _netServer; public NetListener(NetServer netServer) { _netServer = netServer; } public int _acceptAsyncCount = 0; public bool StartListen() { try { start = true; IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port); listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket.Bind(listenPoint); listenSocket.Listen(200); Thread thread1 = new Thread(new ThreadStart(NetProcess)); thread1.Start(); StartAccept(); return true; } catch (Exception ex) { NetLogger.Log(string.Format("**監聽異常!{0}", ex.Message)); return false; } } AutoResetEvent _acceptEvent = new AutoResetEvent(false); private void NetProcess() { while (start) { DealNewAccept(); _acceptEvent.WaitOne(1000 * 10); } } private void DealNewAccept() { try { if(_acceptAsyncCount <= 10) { StartAccept(); } while (true) { AsyncSocketClient client = _newSocketClientList.GetObj(); if (client == null) break; DealNewAccept(client); } } catch (Exception ex) { NetLogger.Log(string.Format("DealNewAccept 異常 {0}***{1}", ex.Message, ex.StackTrace)); } } private void DealNewAccept(AsyncSocketClient client) { client.SendBufferByteCount = _netServer.SendBufferBytePerClient; OnAcceptSocket?.Invoke(_listenParam, client); } private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs) { try { Interlocked.Decrement(ref _acceptAsyncCount); _acceptEvent.Set(); acceptEventArgs.Completed -= AcceptEventArg_Completed; ProcessAccept(acceptEventArgs); } catch (Exception ex) { NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace)); } } public bool StartAccept() { SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs(); acceptEventArgs.Completed += AcceptEventArg_Completed; bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs); Interlocked.Increment(ref _acceptAsyncCount); if (!willRaiseEvent) { Interlocked.Decrement(ref _acceptAsyncCount); _acceptEvent.Set(); acceptEventArgs.Completed -= AcceptEventArg_Completed; ProcessAccept(acceptEventArgs); } return true; } ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>(); private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs) { try { using (acceptEventArgs) { if (acceptEventArgs.AcceptSocket != null) { AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket); client.CreateClientInfo(this); _newSocketClientList.PutObj(client); _acceptEvent.Set(); } } } catch (Exception ex) { NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace)); } } } }
NetConnectManage連接處理
1 using System; 2 using System.Net; 3 using System.Net.Sockets; 4 5 namespace IocpCore 6 { 7 class NetConnectManage 8 { 9 public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent; 10 11 public bool ConnectAsyn(string peerIp, int peerPort, object tag) 12 { 13 try 14 { 15 Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp); 16 SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs(); 17 socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort); 18 socketEventArgs.Completed += SocketConnect_Completed; 19 20 SocketClientInfo clientInfo = new SocketClientInfo(); 21 socketEventArgs.UserToken = clientInfo; 22 clientInfo.PeerIp = peerIp; 23 clientInfo.PeerPort = peerPort; 24 clientInfo.Tag = tag; 25 26 bool willRaiseEvent = socket.ConnectAsync(socketEventArgs); 27 if (!willRaiseEvent) 28 { 29 ProcessConnect(socketEventArgs); 30 socketEventArgs.Completed -= SocketConnect_Completed; 31 socketEventArgs.Dispose(); 32 } 33 return true; 34 } 35 catch (Exception ex) 36 { 37 NetLogger.Log("ConnectAsyn",ex); 38 return false; 39 } 40 } 41 42 private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs) 43 { 44 ProcessConnect(socketEventArgs); 45 socketEventArgs.Completed -= SocketConnect_Completed; 46 socketEventArgs.Dispose(); 47 } 48 49 private void ProcessConnect(SocketAsyncEventArgs socketEventArgs) 50 { 51 SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo; 52 if (socketEventArgs.SocketError == SocketError.Success) 53 { 54 DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo); 55 } 56 else 57 { 58 SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null); 59 socketParam.ClientInfo = clientInfo; 60 OnSocketConnectEvent?.Invoke(socketParam, null); 61 } 62 } 63 64 65 void DealConnectSocket(Socket socket, SocketClientInfo clientInfo) 66 { 67 clientInfo.SetClientInfo(socket); 68 69 AsyncSocketClient client = new AsyncSocketClient(socket); 70 client.SetClientInfo(clientInfo); 71 72 //觸發事件 73 SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket); 74 socketParam.ClientInfo = clientInfo; 75 OnSocketConnectEvent?.Invoke(socketParam, client); 76 } 77 78 public bool Connect(string peerIp, int peerPort, object tag, out Socket socket) 79 { 80 socket = null; 81 try 82 { 83 Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp); 84 85 SocketClientInfo clientInfo = new SocketClientInfo(); 86 clientInfo.PeerIp = peerIp; 87 clientInfo.PeerPort = peerPort; 88 clientInfo.Tag = tag; 89 90 EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort); 91 socketTmp.Connect(remoteEP); 92 if (!socketTmp.Connected) 93 return false; 94 95 DealConnectSocket(socketTmp, clientInfo); 96 socket = socketTmp; 97 return true; 98 } 99 catch (Exception ex) 100 { 101 NetLogger.Log(string.Format("連接對方:({0}:{1})出錯!", peerIp, peerPort), ex); 102 return false; 103 } 104 } 105 } 106 }View Code
AsyncSocketClient socket收發處理
1 using System; 2 using System.Collections.Generic; 3 using System.Diagnostics; 4 using System.Net; 5 using System.Net.Sockets; 6 7 namespace IocpCore 8 { 9 public class AsyncSocketClient 10 { 11 public static int IocpReadLen = 1024; 12 13 public readonly Socket ConnectSocket; 14 15 protected SocketAsyncEventArgs m_receiveEventArgs; 16 public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } } 17 protected byte[] m_asyncReceiveBuffer; 18 19 protected SocketAsyncEventArgs m_sendEventArgs; 20 public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } } 21 protected byte[] m_asyncSendBuffer; 22 23 public event Action<AsyncSocketClient, byte[]> OnReadData; 24 public event Action<AsyncSocketClient, int> OnSendData; 25 public event Action<AsyncSocketClient> OnSocketClose; 26 27 static object releaseLock = new object(); 28 public static int createCount = 0; 29 public static int releaseCount = 0; 30 31 ~AsyncSocketClient() 32 { 33 lock (releaseLock) 34 { 35 releaseCount++; 36 } 37 } 38 39 public AsyncSocketClient(Socket socket) 40 { 41 lock (releaseLock) 42 { 43 createCount++; 44 } 45 46 ConnectSocket = socket; 47 48 m_receiveEventArgs = new SocketAsyncEventArgs(); 49 m_asyncReceiveBuffer = new byte[IocpReadLen]; 50 m_receiveEventArgs.AcceptSocket = ConnectSocket; 51 m_receiveEventArgs.Completed += ReceiveEventArgs_Completed; 52 53 m_sendEventArgs = new SocketAsyncEventArgs(); 54 m_asyncSendBuffer = new byte[IocpReadLen * 2]; 55 m_sendEventArgs.AcceptSocket = ConnectSocket; 56 m_sendEventArgs.Completed += SendEventArgs_Completed; 57 } 58 59 SocketClientInfo _clientInfo; 60 61 public SocketClientInfo ClientInfo 62 { 63 get 64 { 65 return _clientInfo; 66 } 67 } 68 69 internal void CreateClientInfo(NetListener netListener) 70 { 71 _clientInfo = new SocketClientInfo(); 72 try 73 { 74 _clientInfo.Tag = netListener._listenParam._tag; 75 IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint; 76 Debug.Assert(netListener._listenParam._port == ip.Port); 77 78 _clientInfo.LocalIp = ip.Address.ToString(); 79 _clientInfo.LocalPort = netListener._listenParam._port; 80 81 ip = ConnectSocket.RemoteEndPoint as IPEndPoint; 82 _clientInfo.PeerIp = ip.Address.ToString(); 83 _clientInfo.PeerPort = ip.Port; 84 } 85 catch (Exception ex) 86 { 87 NetLogger.Log("CreateClientInfo", ex); 88 } 89 } 90 internal void SetClientInfo(SocketClientInfo clientInfo) 91 { 92 _clientInfo = clientInfo; 93 } 94 95 #region read process 96 bool _inReadPending = false; 97 public EN_SocketReadResult ReadNextData() 98 { 99 lock (this) 100 { 101 if (_socketError) 102 return EN_SocketReadResult.ReadError; 103 if (_inReadPending) 104 return EN_SocketReadResult.InAsyn; 105 if(!ConnectSocket.Connected) 106 { 107 OnReadError(); 108 return EN_SocketReadResult.ReadError; 109 } 110 111 try 112 { 113 m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length); 114 _inReadPending = true; 115 bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投遞接收請求 116 if (!willRaiseEvent) 117 { 118 _inReadPending = false; 119 ProcessReceive(); 120 if (_socketError) 121 { 122 OnReadError(); 123 return EN_SocketReadResult.ReadError; 124 } 125 return EN_SocketReadResult.HaveRead; 126 } 127 else 128 { 129 return EN_SocketReadResult.InAsyn; 130 } 131 } 132 catch (Exception ex) 133 { 134 NetLogger.Log("ReadNextData", ex); 135 _inReadPending = false; 136 OnReadError(); 137 return EN_SocketReadResult.ReadError; 138 } 139 } 140 } 141 142 private void ProcessReceive() 143 { 144 if (ReceiveEventArgs.BytesTransferred > 0 145 && ReceiveEventArgs.SocketError == SocketError.Success) 146 { 147 int offset = ReceiveEventArgs.Offset; 148 int count = ReceiveEventArgs.BytesTransferred; 149 150 byte[] readData = new byte[count]; 151 Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count); 152 153 _inReadPending = false; 154 if (!_socketError) 155 OnReadData?.Invoke(this, readData); 156 } 157 else 158 { 159 _inReadPending = false; 160 OnReadError(); 161 } 162 } 163 164 private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e) 165 { 166 lock (this) 167 { 168 _inReadPending = false; 169 ProcessReceive(); 170 if (_socketError) 171 { 172 OnReadError(); 173 } 174 } 175 } 176 177 bool _socketError = false; 178 private void OnReadError() 179 { 180 lock (this) 181 { 182 if (_socketError == false) 183 { 184 _socketError = true; 185 OnSocketClose?.Invoke(this); 186 } 187 CloseClient(); 188 } 189 } 190 #endregion 191 192 #region send process 193 int _sendBufferByteCount = 102400; 194 public int SendBufferByteCount 195 { 196 get 197 { 198 return _sendBufferByteCount; 199 } 200 set 201 { 202 if (value < 1024) 203 { 204 _sendBufferByteCount = 1024; 205 } 206 else 207 { 208 _sendBufferByteCount = value; 209 } 210 } 211 } 212 213 SendBufferPool _sendDataPool = new SendBufferPool(); 214 internal EN_SendDataResult PutSendData(byte[] data) 215 { 216 if (_socketError) 217 return EN_SendDataResult.no_client; 218 219 if (_sendDataPool._bufferByteCount >= _sendBufferByteCount) 220 { 221 return EN_SendDataResult.buffer_overflow; 222 } 223 224 if (data.Length <= IocpReadLen) 225 { 226 _sendDataPool.PutObj(data); 227 } 228 else 229 { 230 List<byte[]> dataItems = SplitData(data, IocpReadLen); 231 foreach (byte[] item in dataItems) 232 { 233 _sendDataPool.PutObj(item); 234 } 235 } 236 237 return EN_SendDataResult.ok; 238 } 239 240 bool _inSendPending = false; 241 public EN_SocketSendResult SendNextData() 242 { 243 lock (this) 244 { 245 if (_socketError) 246 { 247 return EN_SocketSendResult.SendError; 248 } 249 250 if (_inSendPending) 251 { 252 return EN_SocketSendResult.InAsyn; 253 } 254 255 int sendByteCount = GetSendData(); 256 if (sendByteCount == 0) 257 { 258 return EN_SocketSendResult.NoSendData; 259 } 260 261 //防止拋出異常,否則影響性能 262 if (!ConnectSocket.Connected) 263 { 264 OnSendError(); 265 return EN_SocketSendResult.SendError; 266 } 267 268 try 269 { 270 m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount); 271 _inSendPending = true; 272 bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs); 273 if (!willRaiseEvent) 274 { 275 _inSendPending = false; 276 ProcessSend(m_sendEventArgs); 277 if (_socketError) 278 { 279 OnSendError(); 280 return EN_SocketSendResult.SendError; 281 } 282 else 283 { 284 OnSendData?.Invoke(this, sendByteCount); 285 //繼續發下一條 286 return EN_SocketSendResult.HaveSend; 287 } 288 } 289 else 290 { 291 return EN_SocketSendResult.InAsyn; 292 } 293 } 294 catch (Exception ex) 295 { 296 NetLogger.Log("SendNextData", ex); 297 _inSendPending = false; 298 OnSendError(); 299 return EN_SocketSendResult.SendError; 300 } 301 } 302 } 303 304 private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs) 305 { 306 lock (this) 307 { 308 try 309 { 310 _inSendPending = false; 311 ProcessSend(m_sendEventArgs); 312 313 int sendCount = 0; 314 if (sendEventArgs.SocketError == SocketError.Success) 315 { 316 sendCount = sendEventArgs.BytesTransferred; 317 } 318 OnSendData?.Invoke(this, sendCount); 319 320 if (_socketError) 321 { 322 OnSendError(); 323 } 324 } 325 catch (Exception ex) 326 { 327 NetLogger.Log("SendEventArgs_Completed", ex); 328 } 329 } 330 } 331 332 private bool ProcessSend(SocketAsyncEventArgs sendEventArgs) 333 { 334 if (sendEventArgs.SocketError == SocketError.Success) 335 { 336 return true; 337 } 338 else 339 { 340 OnSendError(); 341 return false; 342 } 343 } 344 345 private int GetSendData() 346 { 347 int dataLen = 0; 348 whil