網上有很多Socket框架,但是我想,C#既然有Socket類,難道不是給人用的嗎? 寫了一個SocketServerHelper和SocketClientHelper,分別隻有5、6百行代碼,比不上大神寫的,和業務代碼耦合也比較重,但對新手非常友好,容易看懂。 支持返回值或回調,支持不定長度的數據 ...
網上有很多Socket框架,但是我想,C#既然有Socket類,難道不是給人用的嗎?
寫了一個SocketServerHelper和SocketClientHelper,分別隻有5、6百行代碼,比不上大神寫的,和業務代碼耦合也比較重,但對新手非常友好,容易看懂。
支持返回值或回調,支持不定長度的數據包。客戶端和服務端均支持斷線重連。
自己本機測試,5000個客戶端併發發送消息正常,CPU壓力有點大。由於區域網機子性能差,區域網只測試了500個客戶端併發發送消息正常。
短短1000多行代碼,花了好多天心血,改了無數BUG,越寫代碼,越覺得自己資質平平,邏輯思維不夠用。寫Socket代碼不像寫一般的代碼,實在不行加個try catch完事,這個東西既要穩定,又要性能,真的是每一個邏輯分支,每一個異常分支,都要想清楚,都要處理好,代碼里我還是Exception用習慣了,沒細分。
有時候為瞭解決一個BUG,找了一整天,也找不出BUG在哪,現在終於測試難過了,達到了自己的預想。
通過這幾天的踩坑,測試,得出結論:
1、Socket TCP 不會丟包,TCP是可靠的。(本機測試、區域網測試,可能沒有遇到更惡劣的網路環境)
2、Socket TCP 能夠保證順序,接收到的順序和發送的順序一致
3、代碼里有數據校驗,但是錯誤的分支永遠都不會走,校驗是一定能通過的,不存在數據校驗不通過,把錯誤的數據包簡單丟棄的情況,否則說明代碼寫的還是有BUG
以下是主要代碼:
SocketServerHelper代碼:
using Models; using Newtonsoft.Json; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// Socket服務端幫助類 /// </summary> public class SocketServerHelper { #region 變數 private int _serverPort; private Socket serverSocket; private ConcurrentDictionary<ClientSocket, string> clientSocketList = new ConcurrentDictionary<ClientSocket, string>(); private ConcurrentDictionary<string, ClientSocket> _dictRoomNoClientSocket = new ConcurrentDictionary<string, ClientSocket>(); private ConcurrentDictionary<string, ClientSocket> _dictDevNoClientSocket = new ConcurrentDictionary<string, ClientSocket>(); public int _CallbackTimeout = 20; /// <summary> /// 等待回調超時時間(單位:秒) /// </summary> public int CallbackTimeout { get { return _CallbackTimeout; } set { value = _CallbackTimeout; } } public int _WaitResultTimeout = 20; /// <summary> /// 等待返回結果超時時間(單位:秒) /// </summary> public int WaitResultTimeout { get { return _WaitResultTimeout; } set { value = _WaitResultTimeout; } } private object _lockSend = new object(); public event EventHandler<ReceivedSocketResultEventArgs> ReceivedSocketResultEvent; private System.Timers.Timer _checkClientTimer; #endregion #region SocketServerHelper 構造函數 public SocketServerHelper(int serverPort) { _serverPort = serverPort; } #endregion #region 啟動服務 /// <summary> /// 啟動服務 /// </summary> public bool StartServer() { try { IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, _serverPort); serverSocket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); serverSocket.Bind(ipEndPoint); serverSocket.Listen(5000); Thread thread = new Thread(new ThreadStart(delegate () { while (true) { Socket client = null; ClientSocket clientSocket = null; try { client = serverSocket.Accept(); client.SendTimeout = 20000; client.ReceiveTimeout = 20000; client.SendBufferSize = 10240; client.ReceiveBufferSize = 10240; clientSocket = new ClientSocket(client); clientSocketList.TryAdd(clientSocket, null); LogUtil.Log("監聽到新的客戶端,當前客戶端數:" + clientSocketList.Count); } catch (Exception ex) { LogUtil.Error(ex); Thread.Sleep(1); continue; } if (client == null) continue; Task.Run(() => { try { byte[] buffer = new byte[10240]; SocketAsyncEventArgs args = new SocketAsyncEventArgs(); clientSocket.SocketAsyncArgs = args; clientSocket.SocketAsyncCompleted = (s, e) => { ReceiveData(clientSocket, e); }; args.SetBuffer(buffer, 0, buffer.Length); args.Completed += clientSocket.SocketAsyncCompleted; client.ReceiveAsync(args); } catch (Exception ex) { LogUtil.Error(ex); } }); } })); thread.IsBackground = true; thread.Start(); //檢測客戶端 _checkClientTimer = new System.Timers.Timer(); _checkClientTimer.AutoReset = false; _checkClientTimer.Interval = 1000; _checkClientTimer.Elapsed += CheckClient; _checkClientTimer.Start(); LogUtil.Log("服務已啟動"); return true; } catch (Exception ex) { LogUtil.Error(ex, "啟動服務出錯"); return false; } } #endregion #region 檢測客戶端 /// <summary> /// 檢測客戶端 /// </summary> private void CheckClient(object sender, System.Timers.ElapsedEventArgs e) { Task.Run(() => { try { foreach (ClientSocket clientSkt in clientSocketList.Keys.ToArray()) { Socket skt = clientSkt.Socket; ClientSocket temp; string strTemp; DateTime now = DateTime.Now; if (now.Subtract(clientSkt.LastHeartbeat).TotalSeconds > 60) { clientSocketList.TryRemove(clientSkt, out strTemp); LogUtil.Log("客戶端已失去連接,當前客戶端數:" + clientSocketList.Count); ActionUtil.TryDoAction(() => { if (skt.Connected) skt.Disconnect(false); }); ActionUtil.TryDoAction(() => { skt.Close(); skt.Dispose(); if (clientSkt.SocketAsyncArgs != null) { if (clientSkt.SocketAsyncCompleted != null) { clientSkt.SocketAsyncArgs.Completed -= clientSkt.SocketAsyncCompleted; } clientSkt.SocketAsyncArgs.Dispose(); } clientSkt.SocketAsyncCompleted = null; clientSkt.SocketAsyncArgs = null; }); } } } catch (Exception ex) { LogUtil.Error(ex, "檢測客戶端出錯"); } finally { _checkClientTimer.Start(); } }); } #endregion #region 接收數據 /// <summary> /// 處理接收的數據包 /// </summary> private void ReceiveData(ClientSocket clientSkt, SocketAsyncEventArgs e) { if (clientSkt == null) return; Socket skt = clientSkt.Socket; try { CopyTo(e.Buffer, clientSkt.Buffer, 0, e.BytesTransferred); #region 校驗數據 if (clientSkt.Buffer.Count < 4) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrHeader = new byte[4]; CopyTo(clientSkt.Buffer, bArrHeader, 0, 0, bArrHeader.Length); string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.ToUpper() == "0XFF") { if (clientSkt.Buffer.Count < 5) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrType = new byte[1]; CopyTo(clientSkt.Buffer, bArrType, 4, 0, bArrType.Length); if (bArrType[0] == 0) { } //心跳包 else if (bArrType[0] == 2 || bArrType[0] == 4) //註冊包、返回值包 { if (clientSkt.Buffer.Count < 9) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrLength = new byte[4]; CopyTo(clientSkt.Buffer, bArrLength, 5, 0, bArrLength.Length); int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || clientSkt.Buffer.Count < dataLength + 9) { if (skt.Connected) skt.ReceiveAsync(e); return; } } } else { LogUtil.Error("type錯誤,丟掉錯誤數據,重新接收"); clientSkt.Buffer.Clear(); //把錯誤的數據丟掉 if (skt.Connected) skt.ReceiveAsync(e); return; } } } else { LogUtil.Error("不是0XFF,丟掉錯誤數據,重新接收"); clientSkt.Buffer.Clear(); //把錯誤的數據丟掉 if (skt.Connected) skt.ReceiveAsync(e); return; } } #endregion SocketData data = null; do { data = ProcessSocketData(clientSkt); } while (data != null); if (skt.Connected) skt.ReceiveAsync(e); } catch (Exception ex) { LogUtil.Error(ex, "處理接收的數據包 異常"); } } #endregion #region 處理接收的數據包 /// <summary> /// 處理接收的數據包 /// </summary> private SocketData ProcessSocketData(ClientSocket clientSkt) { int readLength = 0; SocketData data = ResolveBuffer(clientSkt.Buffer, out readLength); if (data != null) { if (readLength > 0) clientSkt.RemoveBufferData(readLength); if (data.Type == 0) //收到心跳包 { clientSkt.LastHeartbeat = DateTime.Now; //心跳應答 if (clientSkt.RoomNo != null || clientSkt.DevNo != null) { ThreadHelper.Run(() => { lock (clientSkt.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); SocketHelper.Send(clientSkt.Socket, bArrHeader); SocketHelper.Send(clientSkt.Socket, new byte[] { 0x01 }); } }); } else { LogUtil.Log("沒有註冊信息"); } LogUtil.Log("收到心跳包,客戶端連接正常,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); } if (data.Type == 2) //收到註冊包 { if (data.SocketRegisterData != null && clientSkt != null) { ClientSocket temp; if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryRemove(data.SocketRegisterData.RoomNo, out temp); if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryRemove(data.SocketRegisterData.DevNo, out temp); clientSkt.RoomNo = data.SocketRegisterData.RoomNo; clientSkt.DevNo = data.SocketRegisterData.DevNo; if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryAdd(data.SocketRegisterData.RoomNo, clientSkt); if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryAdd(data.SocketRegisterData.DevNo, clientSkt); LogUtil.Log("收到註冊包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); //註冊反饋 ThreadHelper.Run(() => { lock (clientSkt.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); SocketHelper.Send(clientSkt.Socket, bArrHeader); SocketHelper.Send(clientSkt.Socket, new byte[] { 0x05 }); } }); } } if (data.Type == 4) //收到返回值包 { ThreadHelper.Run(() => { if (data.SocketResult != null) clientSkt.CallbackDict.TryAdd(data.SocketResult.callbackId, data.SocketResult); if (ReceivedSocketResultEvent != null) { ReceivedSocketResultEvent(null, new Models.ReceivedSocketResultEventArgs(data.SocketResult)); } }); LogUtil.Log("收到返回值包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); } } return data; } #endregion #region ResolveBuffer /// <summary> /// 解析位元組數組 /// </summary> private SocketData ResolveBuffer(List<byte> buffer, out int readLength) { SocketData socketData = null; readLength = 0; try { if (buffer.Count < 4) return null; byte[] bArrHeader = new byte[4]; CopyTo(buffer, bArrHeader, 0, 0, bArrHeader.Length); readLength += bArrHeader.Length; string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.ToUpper() == "0XFF") { if (buffer.Count < 5) return null; byte[] bArrType = new byte[1]; CopyTo(buffer, bArrType, 4, 0, bArrType.Length); readLength += bArrType.Length; byte bType = bArrType[0]; socketData = new SocketData(); socketData.Type = bType; if (socketData.Type == 2) { if (buffer.Count < 9) return null; byte[] bArrLength = new byte[4]; CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length); readLength += bArrLength.Length; int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || buffer.Count < dataLength + 9) return null; byte[] dataBody = new byte[dataLength]; CopyTo(buffer, dataBody, 9, 0, dataBody.Length); readLength += dataBody.Length; string jsonString = Encoding.UTF8.GetString(dataBody); socketData.SocketRegisterData = JsonConvert.DeserializeObject<SocketRegisterData>(jsonString); } if (socketData.Type == 4) { if (buffer.Count < 9) return null; byte[] bArrLength = new byte[4]; CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length); readLength += bArrLength.Length; int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || buffer.Count < dataLength + 9) return null; byte[] dataBody = new byte[dataLength]; CopyTo(buffer, dataBody, 9, 0, dataBody.Length); readLength += dataBody.Length; string jsonString = Encoding.UTF8.GetString(dataBody); socketData.SocketResult = JsonConvert.DeserializeObject<SocketResult>(jsonString); } } else { LogUtil.Error("不是0XFF"); return null; } } catch (Exception ex) { LogUtil.Error(ex, "解析位元組數組 出錯"); return null; } return socketData; } #endregion #region CopyTo /// <summary> /// 數組複製 /// </summary> private void CopyTo(byte[] bArrSource, List<byte> listTarget, int sourceIndex, int length) { for (int i = 0; i < length; i++) { if (sourceIndex + i < bArrSource.Length) { listTarget.Add(bArrSource[sourceIndex + i]); } } } /// <summary> /// 數組複製 /// </summary> private void CopyTo(List<byte> listSource, byte[] bArrTarget, int sourceIndex, int targetIndex, int length) { for (int i = 0; i < length; i++) { if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count) { bArrTarget[targetIndex + i] = listSource[sourceIndex + i]; } } } #endregion #region 停止服務 /// <summary> /// 停止服務 /// </summary> public void StopServer() { try { foreach (ClientSocket clientSocket in clientSocketList.Keys.ToArray()) { Socket socket = clientSocket.Socket; ActionUtil.TryDoAction(() => { if (socket.Connected) socket.Disconnect(false); }); ActionUtil.TryDoAction(() => { socket.Close(); socket.Dispose(); }); } clientSocketList.Clear(); _dictDevNoClientSocket.Clear(); _dictRoomNoClientSocket.Clear(); if (serverSocket != null) { ActionUtil.TryDoAction(() => { if (serverSocket.Connected) serverSocket.Disconnect(false); }); ActionUtil.TryDoAction(() => { serverSocket.Close(); serverSocket.Dispose(); }); } LogUtil.Log("服務已停止"); } catch (Exception ex) { LogUtil.Error(ex, "停止服務出錯"); } } #endregion #region 釋放資源 /// <summary> /// 釋放資源 /// </summary> public void Dispose() { if (_checkClientTimer != null) { _checkClientTimer.Stop(); _checkClientTimer.Close(); } } #endregion #region Send /// <summary> /// Send 單個發送 並等待結果 /// </summary> /// <returns>false:發送失敗 true:發送成功,但接收端是否處理成功要等待返回結果</returns> public SocketResult Send(WebApiMsgContent msgContent, string roomNo, string devNo) { SocketData data = new SocketData(); data.Type = 3; data.MsgContent = msgContent; ClientSocket clientSocket = null; if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket); if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket); if (clientSocket != null) { if (string.IsNullOrWhiteSpace(msgContent.callbackId)) { msgContent.callbackId = Guid.NewGuid().ToString("N"); } Send(clientSocket, data); return WaitSocketResult(clientSocket, msgContent.callbackId); } else { SocketResult socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "客戶端不存在"; return socketResult; } } /// <summary> /// Send 單個發送 /// </summary> /// <returns>false:發送失敗 true:發送成功,但接收端是否處理成功要等待返回結果</returns> public void Send(WebApiMsgContent msgContent, string roomNo, string devNo, Action<SocketResult> callback = null) { SocketData data = new SocketData(); data.Type = 3; data.MsgContent = msgContent; ClientSocket clientSocket = null; if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket); if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket); if (clientSocket != null) { if (string.IsNullOrWhiteSpace(msgContent.callbackId)) { msgContent.callbackId = Guid.NewGuid().ToString("N"); } if (callback != null) { WaitCallback(clientSocket, msgContent.callbackId, callback); } Send(clientSocket, data); } else { SocketResult socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "客戶端不存在"; if (callback != null) callback(socketResult); } } /// <summary> /// 等待回調 /// </summary> private void WaitCallback(ClientSocket clientSocket, string callbackId, Action<SocketResult> callback = null) { DateTime dt = DateTime.Now.AddSeconds(_CallbackTimeout); System.Timers.Timer timer = new System.Timers.Timer(); timer.AutoReset = false; timer.Interval = 100; timer.Elapsed += (s, e) => { try { SocketResult socketResult; if (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt) { timer.Start(); return; } SocketResult sktResult; clientSocket.CallbackDict.TryRemove(callbackId, out sktResult); if (socketResult == null) { socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "超時"; } if (callback != null) callback(socketResult); timer.Close(); } catch (Exception ex) { LogUtil.Error("WaitCallback error" + ex); } }; timer.Start(); } /// <summary> /// 等待SocketResult /// </summary> private SocketResult WaitSocketResult(ClientSocket clientSocket, string callbackId) { SocketResult socketResult; DateTime dt = DateTime.Now.AddSeconds(_WaitResultTimeout); while (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt) { Thread.Sleep(10); } SocketResult sktResult; clientSocket.CallbackDict.TryRemove(callbackId, out sktResult); if (socketResult == null) { socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "超時"; } return socketResult; } /// <summary> /// Send /// </summary> /// <returns>false:發送失敗 true:發送成功,但不表示對方已收到</returns>