一個簡易socket通信結構

来源:https://www.cnblogs.com/assassinx/archive/2023/02/25/17154167.html
-Advertisement-
Play Games

服務端 工作需要又需要用到socketTCP通訊,這麼多年了,終於稍微能寫點了。讓我說其實也說不出個啥來,看了很多的非同步後稍微對非同步socket的導流 endreceive後 再beginreceive 形成一個內迴圈有了個認識,加上我自己的封包拆包機制,然後再仿那些其它的大多數代碼結構弄點onRe ...


服務端

工作需要又需要用到socketTCP通訊,這麼多年了,終於稍微能寫點了。讓我說其實也說不出個啥來,看了很多的非同步後稍微對非同步socket的導流 endreceive後 再beginreceive 形成一個內迴圈有了個認識,加上我自己的封包拆包機制,然後再仿那些其它的大多數代碼結構弄點onReceive事件進行 收包觸發。整個過程就算差不多了 ,基本上是能夠可靠運行的 靠譜的 中規中矩的,要說啥創新讀到嘛真的談不上。代碼中寫了很多low逼註釋 也是為了方便自己理解 請無視。下麵是server端代碼,使用非同步機制accept 非同步receive ,成員有 clients代表當前線上的客戶端 客戶端socket包裝為EndpointClient ,有onClientAddDel 代表客戶端上線掉線事件,有onReceive代表所有客戶端的收包事件,clients由於是非同步的多線程訪問就要涉及多線程管控 所以使用lock ,服務端有sendToAll() 和SendToSomeOne()毫無疑問這也是通過調用特定的clients來做的。

以下是服務端代碼

  1 public class MsgServerSchedule
  2 {
  3 
  4 
  5     Socket serverSocket;
  6     public Action<List<string>> onClientAddDel;
  7     public Action<Telegram_Base> onReceive;
  8     bool _isRunning = false;
  9 
 10     
 11     int port;
 12 
 13     public TelgramType telType;
 14 
 15     static List<EndpointClient> clients;
 16 
 17     public bool isRunning { get { return _isRunning; } }
 18     public MsgServerSchedule(int _port)
 19     {
 20         //any 就決定了 ip地址格式是v4
 21         //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654);
 22         //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 23 
 24         this.port = _port;
 25 
 26         clients = new List<EndpointClient>();
 27 
 28         Console.WriteLine("constructor");
 29 
 30     }
 31 
 32     public void Start()
 33     {
 34         try
 35         {
 36             IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
 37             serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 38             serverSocket.Bind(endPoint);
 39             serverSocket.Listen(port);
 40 
 41             serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket);
 42 
 43             _isRunning = true;
 44             Console.WriteLine("start");
 45         }
 46         catch (Exception ex)
 47         {
 48             _isRunning = false;
 49             serverSocket = null;
 50 
 51             Console.WriteLine("服務啟動出現錯誤,可能埠已被占用:"+port);
 52             Console.WriteLine(ex.Message);
 53         }
 54        
 55     }
 56 
 57     public void Stop()
 58     {
 59         for (int i = 0; i < clients.Count; i++)
 60         {
 61             clients[i].Close();                
 62         }
 63         ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll);
 64         serverSocket.Close();
 65         _isRunning = false;
 66     }
 67 
 68     public void SendToAll(Telegram_Base tel)
 69     {
 70         for (int i = 0; i < clients.Count; i++)
 71         {
 72             clients[i].Send(tel);
 73         }
 74     }
 75 
 76     public void SendToSomeOne(Telegram_Base tel)
 77     {
 78         for (int i = 0; i < clients.Count; i++)
 79         {
 80             if(clients[i].remoteIPPort==tel.remoteIPPort)
 81             {
 82                 clients[i].Send(tel);
 83                 break;
 84             }
 85         }
 86     }
 87 
 88     //新增與刪除客戶端 秉持原子操作
 89     List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType)
 90     {
 91         //非同步同時有新客戶端上線 與下線 不進行資源互斥訪問會報錯
 92         lock (this)
 93         {
 94             if (changeType == EndPointClientsChangeType.Add)
 95             {
 96                 clients.Add(cli);
 97             }
 98             else if(changeType== EndPointClientsChangeType.Del)
 99             {
100                 var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort);
101                 if (beRemoveClient != null)
102                     clients.Remove(beRemoveClient);
103             }
104             else if(changeType== EndPointClientsChangeType.ClearAll)
105             {
106                 clients.Clear();
107             }
108             else if (changeType == EndPointClientsChangeType.GetAll)
109             {
110                 List<string> onLines = new List<string>(clients.Count);
111                 for (int i = 0; i < clients.Count; i++)
112                 {
113                     onLines.Add(clients[i].remoteIPPort);
114                 }
115                 return onLines;
116             }
117             else
118             {
119                 return null;
120             }
121         }
122         return null;
123     }
124     //非同步監聽客戶端 有客戶端到來時的回調
125     private void AcceptCallback(IAsyncResult iar)
126     {
127         //server端一直在receive 能夠感知到客戶端掉線 (連上後 關閉客戶端 server立即有錯誤爆出)
128         //但是同情況 關閉server端 客戶端無錯誤爆出 直到點發送 才有錯誤爆出
129         //由此得出 處於receive才會有掉線感知  ,send時發現發不出去自然也會有感知 跟人的正常思維理解是一樣的
130         //雖然tcp是所謂的長連接 ,通過反覆測試  ->但是雙方相互都處在一個靜止狀態 是無法 確定在不在的  
131         //連上後平常的情況下 並沒有數據流通 的 ,雙方只是一個狀態的保持而已。
132         //這也是為什麼 好多服務 客戶端 程式 都有個心跳機制(由此我們可以想到繼續完善 弄個客戶端列表 心跳超時的剔除列表 正常發消息send 或receive 異常的剔除列表 刪除clientSocket
133         //其實非要說吧 一般情況 服務端一直在receive 不用心跳其實也是可以的(客戶端可能是真的需要
134         //tcp底層就已經有了一個判斷對方在不在的機制 , 對方直接關程式 結束進程 這些 只要tcp在receive就立即能夠感知 所以說心跳 用不用看情況吧
135 
136         //tcp 不會丟包 哪怕是連接建立了   你還沒開始receive   對方卻先發了,
137         //對方只要是發了的數據 都由操作系統像個緩存樣給你放那的 不會掉 你再隔10秒開始receive都能rec的到
138 
139         //tcp甚至在拔掉網線 再重新插上 都可以保證數據一致性
140         //tcp的包順序能夠保證 先發的先到
141 
142         //nures代碼中那些beginreceivexxx  非同步receive的核心機制就是 ,假定數據到的時候把數據保存到xxx數組
143         //真正endreceive的時候 其實數據已經接收 處理完成了
144 
145         try
146         {
147 
148             //處理完當前accept
149             Socket currentSocket = serverSocket.EndAccept(iar);
150 
151             EndpointClient client = new EndpointClient(currentSocket,telType);
152 
153             //新增客戶端
154             ClientAddDelGetList(client, EndPointClientsChangeType.Add);
155             
156             if (onClientAddDel != null)
157             {
158                 List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
159                 onClientAddDel(onlines);
160 
161                 //客戶端異常掉線
162                 client.onClientDel = new Action<string>((_remoteIPPort) =>
163                 {
164                     ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del);
165 
166                     List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
167                     onClientAddDel(onlines2);
168                 });
169             }
170 
171             
172 
173             //這句到時調用完成後 就會自動把 receivebuffer填充 //要接收的位元組數 系統底層操作一次接收多少位元組 其實意義不大
174             //總是從0開始(就是說併發時會覆蓋
175 
176             Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString()));
177 
178             //currentSocket.Close();
179             //Application.Exit();
180 
181             //Thread.Sleep(1000 * 10);
182             client.onReceive += this.onReceive;
183 
184             client.BeginReceive();
185 
186 
187             //立即開始accept新的客戶端
188             if (isRunning == true && serverSocket != null)
189                 serverSocket.BeginAccept(AcceptCallback, serverSocket);
190             //beginAccept 最開始的方法可以不一樣 ,但最終肯定是一個不斷accept的閉環過程
191             //整個過程就像個導流樣 ,最開始用非同步導流到一個固定的點 然後讓其迴圈源源不斷運轉
192 
193             //加asynccallback 有什麼不一樣麽
194             //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket);
195 
196         }
197         catch (Exception ex)
198         {
199             Console.WriteLine("AcceptCallback Error");
200             Console.WriteLine(ex.Message);
201         }
202 
203     }
204 
205    
206 }

EndpointClient終端代碼代表客戶端的對口人,他的onReceive 等資源從服務端繼承 ,如果服務端想給某個特定客戶端發數據則會調用他們中的某一個 毫無疑問這是通過remoteIPport來判斷的,這些都是編寫基本socket結構輕車熟路的老套路

以下EndpointClient代碼

  1 public class EndpointClient
  2 {
  3     Socket workingSocket;
  4     static int receiveBufferLenMax = 5000;
  5     byte[] onceReadDatas = new byte[receiveBufferLenMax];
  6     List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax);
  7 
  8     public string remoteIPPort { get; set; }
  9     
 10     //當前殘留數據區 接收數據的起始指針(也代表緩衝區數據長度
 11     int receiveBufferLen = 0;
 12 
 13 
 14     TelgramType telType;
 15 
 16     public Action<Telegram_Base> onReceive;
 17     public Action<string> onClientDel;
 18 
 19     public EndpointClient()
 20     {
 21 
 22     }
 23     public EndpointClient(Socket _socket,TelgramType _telType)
 24     {
 25         this.remoteIPPort = _socket.RemoteEndPoint.ToString();
 26         this.telType = _telType;
 27         workingSocket = _socket;
 28     }
 29 
 30     public void Send(Telegram_Base tel)
 31     {
 32         //try
 33         //{
 34             if(workingSocket==null)
 35             {
 36                 Console.WriteLine("未初始化的EndpointClient");
 37                 return;
 38             }
 39             if (tel is Telegram_Schedule)
 40             {
 41                 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
 42                 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
 43                 {
 44                     Console.WriteLine("嘗試發送數據長度格式錯誤的報文");
 45                     return;
 46                 }
 47 
 48                 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
 49                 byte[] sendbytes = telBeSend.dataBytes;
 50 
 51                 //數據超過緩衝區長度 會導致無法拆包
 52                 if (sendbytes.Length <= receiveBufferLenMax)
 53                 {
 54                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
 55                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null
 56                     
 57                     );
 58                 }
 59                 else
 60                 {
 61                     Console.WriteLine("發送到調度客戶端的數據超過緩衝區長度");
 62                     throw new Exception("發送到調度客戶端的數據超過緩衝區長度");
 63                 }
 64 
 65             }
 66             else if (tel is Telegram_SDBMsg)
 67             {
 68 
 69             }
 70 
 71         //}
 72         //catch (Exception ex)
 73         //{
 74 
 75         //    Console.WriteLine(ex.Message);
 76         //    throw ex;
 77         //}
 78     }
 79 
 80     public void BeginReceive()
 81     {
 82         if (workingSocket == null)
 83         {
 84             Console.WriteLine("未初始化的EndpointClient");
 85             return;
 86         }
 87 
 88         receiveBufferLen = 0;
 89         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
 90             ReceiveCallback,
 91         this);
 92     }
 93     private void ReceiveCallback(IAsyncResult iar)
 94     {
 95         try
 96         {
 97             EndpointClient cli = (EndpointClient)iar.AsyncState;
 98             Socket socket = cli.workingSocket;
 99             int reads = socket.EndReceive(iar);
100 
101             if (reads > 0)
102             {
103 
104                 for (int i = 0; i < reads; i++)
105                 {
106                     receiveBuffer.Add(onceReadDatas[i]);
107                 }
108 
109                 //具體填充了多少看返回值 此時 數據已經在buffer中了
110                 receiveBufferLen += reads;
111                 //加完了後解析 阻塞式處理 結束後開啟新的接收
112                 SloveTelData();
113 
114                 if (receiveBufferLenMax - receiveBufferLen > 0)
115                 {
116                     //接收完了 繼續beginreceive 開啟非同步的下次接收 (如果緩衝區有殘留數據 則接收長度變短 ,沒接收到的讓其留在socket不會丟失 下次接收)
117                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
118                 }
119                 else//阻塞式處理都完成一遍了 都還沒清理出任何緩衝區空間 毫無疑問 整體運轉機制已經掛了 不用beginreceive下一次了
120                 {
121                     Close();
122                     //移除自己
123                     if (onClientDel != null)
124                     {
125                         onClientDel(remoteIPPort);
126                     }
127                     Console.WriteLine("服務端介面解析數據出現異常");
128                     throw new Exception("服務端介面解析數據出現異常");
129                 }
130             }
131             else//reads==0 客戶端已關閉
132             {
133                 Close();
134                 //移除自己
135                 if (onClientDel != null)
136                 {
137                     onClientDel(remoteIPPort);
138                 }
139             }
140         }
141         catch (Exception ex)
142         {
143             Close();
144             //移除自己
145             if (onClientDel != null)
146             {
147                 onClientDel(remoteIPPort);
148             }
149 
150             Console.WriteLine("ReceiveCallback Error");
151             Console.WriteLine(ex.Message);
152         }
153 
154     }
155     void SloveTelData()
156     {
157         //進行數據解析
158         SloveTelDataUtil slo = new SloveTelDataUtil();
159         
160         if (telType == TelgramType.Schedule)
161         {
162             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort);
163             //buffer已經被處理一遍了 使用新的長度
164             receiveBufferLen = receiveBuffer.Count;
165             //解析出的每一個對象都觸發 onreceive
166             for (int i = 0; i < dataEntitys.Count; i++)
167             {
168                 if (onReceive != null)
169                     onReceive(dataEntitys[i]);
170             }
171         }
172         else if (telType == TelgramType.SDBMsg)
173         {
174 
175         }
176 
177     }
178 
179    
180     public void Close()
181     {
182         try
183         {
184             receiveBuffer.Clear();
185             receiveBufferLen = 0;
186             if (workingSocket != null && workingSocket.Connected)
187                 workingSocket.Close();
188         }
189         catch (Exception ex)
190         {
191             Console.WriteLine(ex.Message);
192         }
193         
194     }
195 }

數據拆包與封包粘包處理

上面的代碼可以看到 數據包處理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包後從緩衝區移除 解析多少個包觸發多少次事件,殘餘數據留在緩衝區 然後繼續開始新的beginReceive往緩衝區加。在非同步機制中 到達endReceive的時候數據已經在緩衝區里了,這個自不用多說噻。數據包和粘包邏輯在公共類庫里供客戶端服務端共同調用

以下是粘包處理邏輯

  1 public class SloveTelDataUtil
  2 {
  3     List<Telegram_Schedule> solveList;

              
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 哈嘍兄弟,今天給大家分享一個Python tkinter製作法律查詢小工具。 光爬蟲大家也只能自己用用,就算打包了exe,運行也不好看,那我們直接寫成界面。 當我們想瞭解法律,但是又不想買書的時候,就可以用這個小工具,慢慢查詢瞭解。 當然,如果想整的比較有意思,就得靠大家自己來完善實現了。 效果展示 ...
  • 原生的API&註解方式 1.MyBatis原生的API調用 1.1原生API快速入門 需求:在前面的項目基礎上,使用MyBatis原生的API完成,即直接通過SqlSession介面的方法來完成操作 1.2代碼實現 MyBatisNativeTest.java,演示原生的API操作 其實底層還是使用 ...
  • 一、什麼是模塊? 模塊是一系列功能的集合體,而函數是某一個功能的集合體,因此模塊可以看成是一堆函數的集合體。一個py文件內部就可以放一堆函數,因此一個py文件就可以看成一個模塊。如果這個py文件的文件名為module.py,模塊名則是module。 二、模塊的四種形式 在Python中,總共有以下四 ...
  • 一、單元測試框架簡介 1. 什麼是單元測試 單元測試是指在軟體開發過程中,針對軟體的最小單位(函數,方法)進行正確性的檢查測試。 2. 常用單元測試框架 2.1 Java 類別 junit testng 2.2 Python 類別 unittest pytest 3. 單元測試框架主要作用 測試發現 ...
  • 【深進1.例1】求區間和 題目描述 給定 $n$ 個正整數組成的數列 $a_1, a_2, \cdots, a_n$ 和 $m$ 個區間 $[l_i,r_i]$,分別求這 $m$ 個區間的區間和。 輸入格式 共 $n+m+2$ 行。 第一行,為一個正整數 $n$ 。 第二行,為 $n$ 個正整數 $ ...
  • 原創:扣釘日記(微信公眾號ID:codelogs),歡迎分享,非公眾號轉載保留此聲明。 簡介 日常編程工作中,Java集合會經常被使用到,且經常需要對集合做一些類似過濾、排序、對象轉換之類的操作。 為了簡化這類操作,Java8添加了一套新的Stream API,使用方式就像寫SQL一樣,大大簡化了這 ...
  • 這篇文章主要描述分散式數據存儲系統中的數據分片方法,包括哈希方法、一致性哈希方法、帶有限負載的一致性哈希方法以及帶虛擬節點的一致性哈希方法。 ...
  • Lambda 表達式以及方法引用 Java 8 的新特性筆記,重點講的是: Lambda 函數式介面 方法引用 Steam 流 Lambda 表達式 Lambda 的基礎使用不記錄,記錄 JDK 8 實戰 書上的一些底層和核心筆記。 行為參數化 一個貫徹 Lambda 表達式的一個模式、編程規範。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...