服務端 工作需要又需要用到socketTCP通訊,這麼多年了,終於稍微能寫點了。讓我說其實也說不出個啥來,看了很多的非同步後稍微對非同步socket的導流 endreceive後 再beginreceive 形成一個內迴圈有了個認識,加上我自己的封包拆包機制,然後再仿那些其它的大多數代碼結構弄點onRe ...
服務端
工作需要又需要用到socketTCP通訊,這麼多年了,終於稍微能寫點了。讓我說其實也說不出個啥來,看了很多的非同步後稍微對非同步socket的導流 endreceive後 再beginreceive 形成一個內迴圈有了個認識,加上我自己的封包拆包機制,然後再仿那些其它的大多數代碼結構弄點onReceive事件進行 收包觸發。整個過程就算差不多了 ,基本上是能夠可靠運行的 靠譜的 中規中矩的,要說啥創新讀到嘛真的談不上。代碼中寫了很多low逼註釋 也是為了方便自己理解 請無視。下麵是server端代碼,使用非同步機制accept 非同步receive ,成員有 clients代表當前線上的客戶端 客戶端socket包裝為EndpointClient ,有onClientAddDel 代表客戶端上線掉線事件,有onReceive代表所有客戶端的收包事件,clients由於是非同步的多線程訪問就要涉及多線程管控 所以使用lock ,服務端有sendToAll() 和SendToSomeOne()毫無疑問這也是通過調用特定的clients來做的。
以下是服務端代碼
1 public class MsgServerSchedule 2 { 3 4 5 Socket serverSocket; 6 public Action<List<string>> onClientAddDel; 7 public Action<Telegram_Base> onReceive; 8 bool _isRunning = false; 9 10 11 int port; 12 13 public TelgramType telType; 14 15 static List<EndpointClient> clients; 16 17 public bool isRunning { get { return _isRunning; } } 18 public MsgServerSchedule(int _port) 19 { 20 //any 就決定了 ip地址格式是v4 21 //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654); 22 //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 23 24 this.port = _port; 25 26 clients = new List<EndpointClient>(); 27 28 Console.WriteLine("constructor"); 29 30 } 31 32 public void Start() 33 { 34 try 35 { 36 IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port); 37 serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 38 serverSocket.Bind(endPoint); 39 serverSocket.Listen(port); 40 41 serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket); 42 43 _isRunning = true; 44 Console.WriteLine("start"); 45 } 46 catch (Exception ex) 47 { 48 _isRunning = false; 49 serverSocket = null; 50 51 Console.WriteLine("服務啟動出現錯誤,可能埠已被占用:"+port); 52 Console.WriteLine(ex.Message); 53 } 54 55 } 56 57 public void Stop() 58 { 59 for (int i = 0; i < clients.Count; i++) 60 { 61 clients[i].Close(); 62 } 63 ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll); 64 serverSocket.Close(); 65 _isRunning = false; 66 } 67 68 public void SendToAll(Telegram_Base tel) 69 { 70 for (int i = 0; i < clients.Count; i++) 71 { 72 clients[i].Send(tel); 73 } 74 } 75 76 public void SendToSomeOne(Telegram_Base tel) 77 { 78 for (int i = 0; i < clients.Count; i++) 79 { 80 if(clients[i].remoteIPPort==tel.remoteIPPort) 81 { 82 clients[i].Send(tel); 83 break; 84 } 85 } 86 } 87 88 //新增與刪除客戶端 秉持原子操作 89 List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType) 90 { 91 //非同步同時有新客戶端上線 與下線 不進行資源互斥訪問會報錯 92 lock (this) 93 { 94 if (changeType == EndPointClientsChangeType.Add) 95 { 96 clients.Add(cli); 97 } 98 else if(changeType== EndPointClientsChangeType.Del) 99 { 100 var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort); 101 if (beRemoveClient != null) 102 clients.Remove(beRemoveClient); 103 } 104 else if(changeType== EndPointClientsChangeType.ClearAll) 105 { 106 clients.Clear(); 107 } 108 else if (changeType == EndPointClientsChangeType.GetAll) 109 { 110 List<string> onLines = new List<string>(clients.Count); 111 for (int i = 0; i < clients.Count; i++) 112 { 113 onLines.Add(clients[i].remoteIPPort); 114 } 115 return onLines; 116 } 117 else 118 { 119 return null; 120 } 121 } 122 return null; 123 } 124 //非同步監聽客戶端 有客戶端到來時的回調 125 private void AcceptCallback(IAsyncResult iar) 126 { 127 //server端一直在receive 能夠感知到客戶端掉線 (連上後 關閉客戶端 server立即有錯誤爆出) 128 //但是同情況 關閉server端 客戶端無錯誤爆出 直到點發送 才有錯誤爆出 129 //由此得出 處於receive才會有掉線感知 ,send時發現發不出去自然也會有感知 跟人的正常思維理解是一樣的 130 //雖然tcp是所謂的長連接 ,通過反覆測試 ->但是雙方相互都處在一個靜止狀態 是無法 確定在不在的 131 //連上後平常的情況下 並沒有數據流通 的 ,雙方只是一個狀態的保持而已。 132 //這也是為什麼 好多服務 客戶端 程式 都有個心跳機制(由此我們可以想到繼續完善 弄個客戶端列表 心跳超時的剔除列表 正常發消息send 或receive 異常的剔除列表 刪除clientSocket 133 //其實非要說吧 一般情況 服務端一直在receive 不用心跳其實也是可以的(客戶端可能是真的需要 134 //tcp底層就已經有了一個判斷對方在不在的機制 , 對方直接關程式 結束進程 這些 只要tcp在receive就立即能夠感知 所以說心跳 用不用看情況吧 135 136 //tcp 不會丟包 哪怕是連接建立了 你還沒開始receive 對方卻先發了, 137 //對方只要是發了的數據 都由操作系統像個緩存樣給你放那的 不會掉 你再隔10秒開始receive都能rec的到 138 139 //tcp甚至在拔掉網線 再重新插上 都可以保證數據一致性 140 //tcp的包順序能夠保證 先發的先到 141 142 //nures代碼中那些beginreceivexxx 非同步receive的核心機制就是 ,假定數據到的時候把數據保存到xxx數組 143 //真正endreceive的時候 其實數據已經接收 處理完成了 144 145 try 146 { 147 148 //處理完當前accept 149 Socket currentSocket = serverSocket.EndAccept(iar); 150 151 EndpointClient client = new EndpointClient(currentSocket,telType); 152 153 //新增客戶端 154 ClientAddDelGetList(client, EndPointClientsChangeType.Add); 155 156 if (onClientAddDel != null) 157 { 158 List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll); 159 onClientAddDel(onlines); 160 161 //客戶端異常掉線 162 client.onClientDel = new Action<string>((_remoteIPPort) => 163 { 164 ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del); 165 166 List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll); 167 onClientAddDel(onlines2); 168 }); 169 } 170 171 172 173 //這句到時調用完成後 就會自動把 receivebuffer填充 //要接收的位元組數 系統底層操作一次接收多少位元組 其實意義不大 174 //總是從0開始(就是說併發時會覆蓋 175 176 Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString())); 177 178 //currentSocket.Close(); 179 //Application.Exit(); 180 181 //Thread.Sleep(1000 * 10); 182 client.onReceive += this.onReceive; 183 184 client.BeginReceive(); 185 186 187 //立即開始accept新的客戶端 188 if (isRunning == true && serverSocket != null) 189 serverSocket.BeginAccept(AcceptCallback, serverSocket); 190 //beginAccept 最開始的方法可以不一樣 ,但最終肯定是一個不斷accept的閉環過程 191 //整個過程就像個導流樣 ,最開始用非同步導流到一個固定的點 然後讓其迴圈源源不斷運轉 192 193 //加asynccallback 有什麼不一樣麽 194 //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket); 195 196 } 197 catch (Exception ex) 198 { 199 Console.WriteLine("AcceptCallback Error"); 200 Console.WriteLine(ex.Message); 201 } 202 203 } 204 205 206 }
EndpointClient終端代碼代表客戶端的對口人,他的onReceive 等資源從服務端繼承 ,如果服務端想給某個特定客戶端發數據則會調用他們中的某一個 毫無疑問這是通過remoteIPport來判斷的,這些都是編寫基本socket結構輕車熟路的老套路
以下EndpointClient代碼
1 public class EndpointClient 2 { 3 Socket workingSocket; 4 static int receiveBufferLenMax = 5000; 5 byte[] onceReadDatas = new byte[receiveBufferLenMax]; 6 List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax); 7 8 public string remoteIPPort { get; set; } 9 10 //當前殘留數據區 接收數據的起始指針(也代表緩衝區數據長度 11 int receiveBufferLen = 0; 12 13 14 TelgramType telType; 15 16 public Action<Telegram_Base> onReceive; 17 public Action<string> onClientDel; 18 19 public EndpointClient() 20 { 21 22 } 23 public EndpointClient(Socket _socket,TelgramType _telType) 24 { 25 this.remoteIPPort = _socket.RemoteEndPoint.ToString(); 26 this.telType = _telType; 27 workingSocket = _socket; 28 } 29 30 public void Send(Telegram_Base tel) 31 { 32 //try 33 //{ 34 if(workingSocket==null) 35 { 36 Console.WriteLine("未初始化的EndpointClient"); 37 return; 38 } 39 if (tel is Telegram_Schedule) 40 { 41 Telegram_Schedule telBeSend = tel as Telegram_Schedule; 42 if (telBeSend.dataBytes.Length != telBeSend.dataLen) 43 { 44 Console.WriteLine("嘗試發送數據長度格式錯誤的報文"); 45 return; 46 } 47 48 byte[] sendBytesHeader = telBeSend.dataBytesHeader; 49 byte[] sendbytes = telBeSend.dataBytes; 50 51 //數據超過緩衝區長度 會導致無法拆包 52 if (sendbytes.Length <= receiveBufferLenMax) 53 { 54 workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null); 55 workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null 56 57 ); 58 } 59 else 60 { 61 Console.WriteLine("發送到調度客戶端的數據超過緩衝區長度"); 62 throw new Exception("發送到調度客戶端的數據超過緩衝區長度"); 63 } 64 65 } 66 else if (tel is Telegram_SDBMsg) 67 { 68 69 } 70 71 //} 72 //catch (Exception ex) 73 //{ 74 75 // Console.WriteLine(ex.Message); 76 // throw ex; 77 //} 78 } 79 80 public void BeginReceive() 81 { 82 if (workingSocket == null) 83 { 84 Console.WriteLine("未初始化的EndpointClient"); 85 return; 86 } 87 88 receiveBufferLen = 0; 89 workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None, 90 ReceiveCallback, 91 this); 92 } 93 private void ReceiveCallback(IAsyncResult iar) 94 { 95 try 96 { 97 EndpointClient cli = (EndpointClient)iar.AsyncState; 98 Socket socket = cli.workingSocket; 99 int reads = socket.EndReceive(iar); 100 101 if (reads > 0) 102 { 103 104 for (int i = 0; i < reads; i++) 105 { 106 receiveBuffer.Add(onceReadDatas[i]); 107 } 108 109 //具體填充了多少看返回值 此時 數據已經在buffer中了 110 receiveBufferLen += reads; 111 //加完了後解析 阻塞式處理 結束後開啟新的接收 112 SloveTelData(); 113 114 if (receiveBufferLenMax - receiveBufferLen > 0) 115 { 116 //接收完了 繼續beginreceive 開啟非同步的下次接收 (如果緩衝區有殘留數據 則接收長度變短 ,沒接收到的讓其留在socket不會丟失 下次接收) 117 socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this); 118 } 119 else//阻塞式處理都完成一遍了 都還沒清理出任何緩衝區空間 毫無疑問 整體運轉機制已經掛了 不用beginreceive下一次了 120 { 121 Close(); 122 //移除自己 123 if (onClientDel != null) 124 { 125 onClientDel(remoteIPPort); 126 } 127 Console.WriteLine("服務端介面解析數據出現異常"); 128 throw new Exception("服務端介面解析數據出現異常"); 129 } 130 } 131 else//reads==0 客戶端已關閉 132 { 133 Close(); 134 //移除自己 135 if (onClientDel != null) 136 { 137 onClientDel(remoteIPPort); 138 } 139 } 140 } 141 catch (Exception ex) 142 { 143 Close(); 144 //移除自己 145 if (onClientDel != null) 146 { 147 onClientDel(remoteIPPort); 148 } 149 150 Console.WriteLine("ReceiveCallback Error"); 151 Console.WriteLine(ex.Message); 152 } 153 154 } 155 void SloveTelData() 156 { 157 //進行數據解析 158 SloveTelDataUtil slo = new SloveTelDataUtil(); 159 160 if (telType == TelgramType.Schedule) 161 { 162 List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort); 163 //buffer已經被處理一遍了 使用新的長度 164 receiveBufferLen = receiveBuffer.Count; 165 //解析出的每一個對象都觸發 onreceive 166 for (int i = 0; i < dataEntitys.Count; i++) 167 { 168 if (onReceive != null) 169 onReceive(dataEntitys[i]); 170 } 171 } 172 else if (telType == TelgramType.SDBMsg) 173 { 174 175 } 176 177 } 178 179 180 public void Close() 181 { 182 try 183 { 184 receiveBuffer.Clear(); 185 receiveBufferLen = 0; 186 if (workingSocket != null && workingSocket.Connected) 187 workingSocket.Close(); 188 } 189 catch (Exception ex) 190 { 191 Console.WriteLine(ex.Message); 192 } 193 194 } 195 }
數據拆包與封包粘包處理
上面的代碼可以看到 數據包處理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包後從緩衝區移除 解析多少個包觸發多少次事件,殘餘數據留在緩衝區 然後繼續開始新的beginReceive往緩衝區加。在非同步機制中 到達endReceive的時候數據已經在緩衝區里了,這個自不用多說噻。數據包和粘包邏輯在公共類庫里供客戶端服務端共同調用
以下是粘包處理邏輯
1 public class SloveTelDataUtil 2 { 3 List<Telegram_Schedule> solveList;