一、前言 時間過得真是快,轉眼就2018年了。首先祝各位博友,軟體開發者新年新氣象,事業有成,身體健康,闔家幸福!最近看到園子里好多關於自己的2017年度總結以及對自己新一年的願景,覺得咱園子的氛圍是真的好。這三天假期我也沒閑著,一邊看OB海鮮團吃雞一邊寫Socket SocketAsyncEven ...
一、前言
時間過得真是快,轉眼就2018年了。首先祝各位博友,軟體開發者新年新氣象,事業有成,身體健康,闔家幸福!最近看到園子里好多關於自己的2017年度總結以及對自己新一年的願景,覺得咱園子的氛圍是真的好。這三天假期我也沒閑著,一邊看OB海鮮團吃雞一邊寫Socket SocketAsyncEventArgs 代碼。我上一篇博客已經用APM的方式實現了客戶端與伺服器端的Socket通信,並具有了一定的併發能力。所以這三天我就決定對伺服器代碼進行改造,使用MS在4.0時發佈的SocketAsyncEventArgs(SAEA)寫法。為了方便的進行伺服器端兩種寫法的對比,我客戶端的代碼沒有進行變化,依然使用APM方式。代碼已經上傳至Github,鏈接會在文末貼出。
二、我的業務功能
我的業務功能依然是實現從伺服器多線程下載更新文件。下載之前的那些操作我基本就不講了,上一篇博文里的都有,本文還是回到Socket下載文件上。具體流程如下:
在我寫SAEA代碼之前,我仔細搜了一下網上的資源:MSDN、CNBLOG、CSDN、CodeProject。這四種來源的代碼示例的主要流程是這樣的:
對比我的流程,您會發現少了一半的通信過程。客戶端的代碼好寫,但是伺服器端如何發送完數據之後再接收數據?這中間的銜接過程還是有點門道的。特別是SAEA的代碼採用了Buffer池化以及SAEA池化之後,裡面有些小的細節就要想清楚了。下麵就是具體的代碼,我會以我自己的視角去論述APM與SAEA到底有什麼區別。
三、對比
其實對於伺服器端的APM,我覺得最重要的並不是代碼中的BeginXXX或者是EndXXX,因為這就是APM寫法的特征,BeginXXX或者EndXXX然後裡面有一個回調函數,在回調函數里去做一些業務上的事情。最重要的是要有一個線程等待的概念,也就是代碼中的ManualResetEvent這個東西,它就像地鐵閘機一樣,處理好一個再放一個進去。APM寫法的好處是顯而易見的,就是代碼看起來十分的簡單。缺點依照MS的說法就是如果有過多的客-服交流,可能會產生較多的IAsyncResult對象,這樣會增加伺服器的開銷。
伺服器端的APM寫法:
1 using System; 2 using System.IO; 3 using System.Linq; 4 using System.Net; 5 using System.Net.Sockets; 6 using System.Threading; 7 using UpdaterShare.GlobalSetting; 8 using UpdaterShare.Model; 9 using UpdaterShare.Utility; 10 11 namespace UpdaterServerAPM 12 { 13 public static class ServerSocket 14 { 15 private static int _downloadChannelsCount; 16 private static string _serverPath; 17 private static readonly ManualResetEvent AllDone = new ManualResetEvent(false); 18 19 public static void StartServer(int port, int backlog) 20 { 21 _downloadChannelsCount = DownloadSetting.DownloadChannelsCount; 22 try 23 { 24 IPAddress ipAddress = IPAddress.Any; 25 IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port); 26 Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 27 listener.Bind(localEndPoint); 28 listener.Listen(backlog); 29 30 while (true) 31 { 32 AllDone.Reset(); 33 listener.BeginAccept(AcceptCallback, listener); 34 AllDone.WaitOne(); 35 } 36 } 37 catch (Exception ex) 38 { 39 var path = $"{AppDomain.CurrentDomain.BaseDirectory}\\RunLog.txt"; 40 File.AppendAllText(path, ex.Message); 41 } 42 } 43 44 45 private static void AcceptCallback(IAsyncResult ar) 46 { 47 AllDone.Set(); 48 Socket listener = (Socket)ar.AsyncState; 49 Socket handler = listener.EndAccept(ar); 50 ComObject state = new ComObject { WorkSocket = handler }; 51 handler.BeginReceive(state.Buffer, 0, ComObject.BufferSize, 0, FindUpdateFileCallback, state); 52 } 53 54 55 private static void FindUpdateFileCallback(IAsyncResult ar) 56 { 57 ComObject state = (ComObject)ar.AsyncState; 58 Socket handler = state.WorkSocket; 59 int bytesRead = handler.EndReceive(ar); 60 if (bytesRead > 0) 61 { 62 var receiveData = state.Buffer.Take(bytesRead).ToArray(); 63 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientFindFileInfoTag()); 64 if (dataList != null && dataList.Any()) 65 { 66 var request = PacketUtils.GetData(PacketUtils.ClientFindFileInfoTag(), dataList.FirstOrDefault()); 67 string str = System.Text.Encoding.UTF8.GetString(request); 68 var infos = str.Split('_'); 69 var productName = infos[0]; 70 var revitVersion = infos[1]; 71 var currentVersion = infos[2]; 72 73 var mainFolder = AppDomain.CurrentDomain.BaseDirectory.Replace("bin", "TestFile"); 74 var serverFileFolder = Path.Combine(mainFolder, "Server"); 75 var serverFileFiles = new DirectoryInfo(serverFileFolder).GetFiles(); 76 77 var updatefile = serverFileFiles.FirstOrDefault(x=>x.Name.Contains(productName) && x.Name.Contains(revitVersion) && x.Name.Contains(currentVersion)); 78 if (updatefile != null) 79 { 80 if (string.IsNullOrEmpty(updatefile.FullName) || !File.Exists(updatefile.FullName)) return; 81 _serverPath = updatefile.FullName; 82 FoundUpdateFileResponse(handler); 83 } 84 } 85 } 86 } 87 88 89 private static void FoundUpdateFileResponse(Socket handler) 90 { 91 byte[] foundUpdateFileData = PacketUtils.PacketData(PacketUtils.ServerFoundFileInfoTag(),null); 92 ComObject state = new ComObject { WorkSocket = handler }; 93 handler.BeginSend(foundUpdateFileData, 0, foundUpdateFileData.Length, 0, HasFoundUpdateFileCallback, state); 94 } 95 96 97 private static void HasFoundUpdateFileCallback(IAsyncResult ar) 98 { 99 ComObject state = (ComObject)ar.AsyncState; 100 Socket handler = state.WorkSocket; 101 handler.EndSend(ar); 102 handler.BeginReceive(state.Buffer, 0, ComObject.BufferSize, 0, ReadFilePositionRequestCallback, state); 103 } 104 105 106 private static void ReadFilePositionRequestCallback(IAsyncResult ar) 107 { 108 ComObject state = (ComObject)ar.AsyncState; 109 Socket handler = state.WorkSocket; 110 int bytesRead = handler.EndReceive(ar); 111 if (bytesRead > 0) 112 { 113 var receiveData = state.Buffer.Take(bytesRead).ToArray(); 114 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientRequestFileTag()); 115 if (dataList != null) 116 { 117 foreach (var request in dataList) 118 { 119 if (PacketUtils.IsPacketComplete(request)) 120 { 121 int startPosition = PacketUtils.GetRequestFileStartPosition(request); 122 SendFileResponse(handler, startPosition); 123 } 124 } 125 } 126 } 127 } 128 129 private static void SendFileResponse(Socket handler, int startPosition) 130 { 131 var packetSize = PacketUtils.GetPacketSize(_serverPath, _downloadChannelsCount); 132 if (packetSize != 0) 133 { 134 byte[] filedata = FileUtils.GetFile(_serverPath, startPosition, packetSize); 135 byte[] packetNumber = BitConverter.GetBytes(startPosition/packetSize); 136 if (filedata != null) 137 { 138 byte[] segmentedFileResponseData = PacketUtils.PacketData(PacketUtils.ServerResponseFileTag(), filedata, packetNumber); 139 ComObject state = new ComObject {WorkSocket = handler}; 140 handler.BeginSend(segmentedFileResponseData, 0, segmentedFileResponseData.Length, 0, SendFileResponseCallback, state); 141 } 142 } 143 else 144 { 145 handler.Shutdown(SocketShutdown.Both); 146 handler.Close(); 147 } 148 } 149 150 151 private static void SendFileResponseCallback(IAsyncResult ar) 152 { 153 try 154 { 155 ComObject state = (ComObject)ar.AsyncState; 156 Socket handler = state.WorkSocket; 157 handler.EndSend(ar); 158 handler.Shutdown(SocketShutdown.Both); 159 handler.Close(); 160 } 161 catch (Exception e) 162 { 163 164 } 165 } 166 } 167 }
說到SAEA,我覺得初入的小伙伴一定要先看MSDN上的實例,特別是它的BufferManager以及SocketAsyncEventArgsPool是怎麼寫的,到底是乾什麼用的。這裡我可以簡單的說下:SocketAsyncEventArgsPool是用來存放SAEA對象的,其個數依賴於你伺服器所能承擔的隊列長度,比如說我伺服器能承擔100個客戶的等待,我就在伺服器端生成100個SAEA對象放在池子里,當有客戶來連接時,我從池子里取出一個來和他對接。客戶走了,我再扔到池子里去。BufferManager則是對池子里的SAEA對象進行Buffer分配的,也相當於一個池子,這個池子的大小是隊列長度*通信緩存長度*2,乘以2是因為讀與寫是分開的。通信緩存長度很好理解,客戶端要傳個2G的信息給伺服器端不可能一下子接收2G,肯定是一口一口吃,那麼這一口的大小就是通信緩存長度。那麼分配給每個SAEA的緩存是多大呢?當然就是通信緩存長度的大小咯。註意!!註意!!註意!!既然是池化了,所有關於Buffer的操作都要圍繞分配給SAEA的Buffer去操作!見148-149行。當伺服器拿著分配到的Buffer去接收信息後,如果再要發送信息,所要做的第一件事就是先清空分配的Buffer再使用,BufferManager給你分配哪段你就用哪段,別使用錯了。有幾個參數需要註意:e.Offset(偏移),e.Count(大小),e.Buffer(緩存位元組數組), e.BytesTransferred(通信傳輸的位元組長度)。如果伺服器端要發送數據,一定要用Array.Copy將信息寫入對應分配的Buffer中。
說完池化,接著就是寫法上的小區別,我覺得區別並不大,無非就是委托換了個寫法。當然還要判斷下是否為非同步操作,如果是否則需要進行同步操作,見82-85行代碼。
伺服器的SAEA寫法:
1 using System; 2 using System.IO; 3 using System.Linq; 4 using System.Net; 5 using System.Net.Sockets; 6 using System.Threading; 7 using UpdaterShare.GlobalSetting; 8 using UpdaterShare.Model; 9 using UpdaterShare.Utility; 10 11 namespace UpdaterServerSAEA 12 { 13 public class ServerSocket 14 { 15 private readonly int _port; 16 private readonly int _backlog; 17 private Socket _listenSocket; 18 private const int _opsToPreAlloc = 2; 19 private readonly BufferManager _bufferManager; 20 private readonly SocketAsyncEventArgsPool _readWritePool; 21 private readonly Semaphore _maxNumberAcceptedClients; 22 23 private string _serverPath; 24 private static readonly int _downloadChannelsCount = DownloadSetting.DownloadChannelsCount; 25 26 public ServerSocket(int port, int backlog) 27 { 28 _port = port; 29 _backlog = backlog; 30 31 _bufferManager = new BufferManager(ComObject.BufferSize * backlog * _opsToPreAlloc, ComObject.BufferSize); 32 _readWritePool = new SocketAsyncEventArgsPool(backlog); 33 _maxNumberAcceptedClients = new Semaphore(backlog, backlog); 34 } 35 36 37 private void Init() 38 { 39 _bufferManager.InitBuffer(); 40 41 for (var i = 0; i < _backlog; i++) 42 { 43 var readWriteEventArg = new SocketAsyncEventArgs(); 44 _bufferManager.SetBuffer(readWriteEventArg); 45 _readWritePool.Push(readWriteEventArg); 46 } 47 } 48 49 50 public void StartServer() 51 { 52 try 53 { 54 Init(); 55 IPAddress ipAddress = IPAddress.Any; 56 IPEndPoint localEndPoint = new IPEndPoint(ipAddress, _port); 57 _listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 58 _listenSocket.Bind(localEndPoint); 59 _listenSocket.Listen(_backlog); 60 StartAccept(null); 61 } 62 catch (Exception ex) 63 { 64 Console.WriteLine(ex.Message); 65 } 66 } 67 68 private void StartAccept(SocketAsyncEventArgs acceptEventArg) 69 { 70 if (acceptEventArg == null) 71 { 72 acceptEventArg = new SocketAsyncEventArgs(); 73 acceptEventArg.Completed += StartAccept_Completed; 74 } 75 else 76 { 77 acceptEventArg.AcceptSocket = null; 78 } 79 80 _maxNumberAcceptedClients.WaitOne(); 81 82 if (!_listenSocket.AcceptAsync(acceptEventArg)) 83 { 84 ProcessAccept(acceptEventArg); 85 } 86 } 87 88 private void StartAccept_Completed(object sender, SocketAsyncEventArgs e) 89 { 90 ProcessAccept(e); 91 } 92 93 94 private void ProcessAccept(SocketAsyncEventArgs e) 95 { 96 if (e.SocketError == SocketError.Success) 97 { 98 var socket = e.AcceptSocket; 99 if (socket.Connected) 100 { 101 SocketAsyncEventArgs readEventArgs = _readWritePool.Pop(); 102 readEventArgs.AcceptSocket = socket; 103 readEventArgs.Completed += ProcessAccept_Completed; 104 if (!socket.ReceiveAsync(readEventArgs)) 105 { 106 ProcessReceiveFindFileRequest(readEventArgs); 107 } 108 StartAccept(e); 109 } 110 } 111 } 112 113 private void ProcessAccept_Completed(object sender, SocketAsyncEventArgs e) 114 { 115 ProcessReceiveFindFileRequest(e); 116 } 117 118 119 private void ProcessReceiveFindFileRequest(SocketAsyncEventArgs e) 120 { 121 var bytesRead = e.BytesTransferred; 122 if (bytesRead > 0 && e.SocketError == SocketError.Success) 123 { 124 var receiveData = e.Buffer.Skip(e.Offset).Take(bytesRead).ToArray(); 125 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientFindFileInfoTag()); 126 if (dataList != null && dataList.Any()) 127 { 128 var request = PacketUtils.GetData(PacketUtils.ClientFindFileInfoTag(), dataList.FirstOrDefault()); 129 string str = System.Text.Encoding.UTF8.GetString(request); 130 var infos = str.Split('_'); 131 var productName = infos[0]; 132 var revitVersion = infos[1]; 133 var currentVersion = infos[2]; 134 135 var mainFolder = AppDomain.CurrentDomain.BaseDirectory.Replace("bin", "TestFile"); 136 var serverFileFolder = Path.Combine(mainFolder, "Server"); 137 var serverFileFiles = new DirectoryInfo(serverFileFolder).GetFiles(); 138 139 var updatefile = serverFileFiles.FirstOrDefault(x => x.Name.Contains(productName) && x.Name.Contains(revitVersion) && x.Name.Contains(currentVersion)); 140 if (updatefile != null) 141 { 142 if (string.IsNullOrEmpty(updatefile.FullName) || !File.Exists(updatefile.FullName)) return; 143 _serverPath = updatefile.FullName; 144 145 //ready to send back to Client 146 byte[] foundUpdateFileData = PacketUtils.PacketData(PacketUtils.ServerFoundFileInfoTag(), null); 147 148 Array.Clear(e.Buffer, e.Offset, e.Count); 149 Array.Copy(foundUpdateFileData, 0, e.Buffer, e.Offset, foundUpdateFileData.Length); 150 151 e.Completed -= ProcessAccept_Completed; 152 e.Completed += ProcessReceiveFindFileRequest_Completed; 153 154 if (!e.AcceptSocket.SendAsync(e)) 155 { 156 ProcessFilePosition(e); 157 } 158 } 159 } 160 } 161 } 162 163 164 private void ProcessReceiveFindFileRequest_Completed(object sender, SocketAsyncEventArgs e) 165 { 166 ProcessFilePosition(e); 167 } 168 169 170 private void ProcessFilePosition(SocketAsyncEventArgs e) 171 { 172 if (e.SocketError == SocketError.Success) 173 { 174 var socket = e.AcceptSocket; 175 if (socket.Connected) 176 { 177 //clear buffer 178 Array.Clear(e.Buffer, e.Offset, e.Count); 179 180 e.Completed -= ProcessReceiveFindFileRequest_Completed; 181 e.Completed += ProcessFilePosition_Completed; 182 183 if (!socket.ReceiveAsync(e)) 184 { 185 ProcessSendFile(e); 186 } 187 } 188 } 189 } 190 191 private void ProcessFilePosition_Completed(object sender, SocketAsyncEventArgs e) 192 { 193 ProcessSendFile(e); 194 } 195 196 private void ProcessSendFile(SocketAsyncEventArgs e) 197 { 198 var bytesRead = e.BytesTransferred; 199 if (bytesRead > 0 && e.SocketError == SocketError.Success) 200 { 201 var receiveData = e.Buffer.Skip(e.Offset).Take(bytesRead).ToArray(); 202 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientRequestFileTag()); 203 if (dataList != null) 204 { 205 foreach (var request in dataList) 206 { 207 if (PacketUtils.IsPacketComplete(request)) 208 { 209 int startPosition = PacketUtils.GetRequestFileStartPosition(request); 210 211 var packetSize = PacketUtils.GetPacketSize(_serverPath, _downloadChannelsCount); 212 if (packetSize != 0) 213 { 214 byte[] filedata = FileUtils.GetFile(_serverPath, startPosition, packetSize); 215 byte[] packetNumber = BitConverter.GetBytes(startPosition / packetSize); 216 217 Console.WriteLine("Receive File Request PacketNumber: "+startPosition / packetSize); 218 219 if (filedata != null) 220 { 221 //ready to send back to Client 222 byte[] segmentedFileResponseData = PacketUtils.PacketData(PacketUtils.ServerResponseFileTag(), filedata, packetNumber); 223 224 Array.Clear(e.Buffer, e.Offset, e.Count); 225 Array.Copy(segmentedFileResponseData, 0, e.Buffer, e.Offset, segmentedFileResponseData.Length); 226 227 e.Completed -= ProcessFilePosition_Completed; 228 e.Completed += ProcessSendFile_Completed; 229 230 if (!e.AcceptSocket.SendAsync(e)) 231 { 232 CloseClientSocket(e); 233 } 234 } 235 } 236 } 237 } 238 } 239 } 240 else 241 { 242 CloseClientSocket(e); 243 } 244 } 245 246 247 private void ProcessSendFile_Completed(object sender, SocketAsyncEventArgs e) 248 { 249 CloseClientSocket(e); 250 } 251 252 253 private void CloseClientSocket(SocketAsyncEventArgs e) 254 { 255 try 256 { 257 e.AcceptSocket.Shutdown(SocketShutdown.Both); 258 e.AcceptSocket.Close(); 259 } 260 catch (Exception ex) 261 { 262 Console.WriteLine(ex.Message); 263 } 264 finally 265 { 266 _maxNumberAcceptedClients.Release(); 267