Pool /// <summary> /// 與每個客戶Socket相關聯,進行Send和Receive投遞時所需要的參數 /// </summary> public class IoContextPool { List<SocketAsyncEventArgs> pool; //為每一個Socke ...
Pool
/// <summary> /// 與每個客戶Socket相關聯,進行Send和Receive投遞時所需要的參數 /// </summary> public class IoContextPool { List<SocketAsyncEventArgs> pool; //為每一個Socket客戶端分配一個SocketAsyncEventArgs,用一個List管理,在程式啟動時建立。 Int32 capacity; //pool對象池的容量 Int32 boundary; //已分配和未分配對象的邊界,大的是已經分配的,小的是未分配的 public IoContextPool(Int32 capacity) { this.pool = new List<SocketAsyncEventArgs>(capacity); this.boundary = 0; this.capacity = capacity; } /// <summary> /// 往pool對象池中增加新建立的對象,因為這個程式在啟動時會建立好所有對象, /// 故這個方法只在初始化時會被調用,因此,沒有加鎖。 /// </summary> /// <param name="arg"></param> /// <returns></returns> public bool Add(SocketAsyncEventArgs arg) { if (arg != null && pool.Count < capacity) { pool.Add(arg); boundary++; return true; } else return false; } /// <summary> /// 取出集合中指定對象,內部使用 /// </summary> /// <param name="index"></param> /// <returns></returns> //internal SocketAsyncEventArgs Get(int index) //{ // if (index >= 0 && index < capacity) // return pool[index]; // else // return null; //} /// <summary> /// 從對象池中取出一個對象,交給一個socket來進行投遞請求操作 /// </summary> /// <returns></returns> public SocketAsyncEventArgs Pop() { lock (this.pool) { if (boundary > 0) { --boundary; return pool[boundary]; } else return null; } } /// <summary> /// 一個socket客戶斷開,與其相關的IoContext被釋放,重新投入Pool中,備用。 /// </summary> /// <param name="arg"></param> /// <returns></returns> public bool Push(SocketAsyncEventArgs arg) { if (arg != null) { lock (this.pool) { int index = this.pool.IndexOf(arg, boundary); //找出被斷開的客戶,此處一定能查到,因此index不可能為-1,必定要大於0。 if (index == boundary) //正好是邊界元素 boundary++; else { this.pool[index] = this.pool[boundary]; //將斷開客戶移到邊界上,邊界右移 this.pool[boundary++] = arg; } } return true; } else return false; } }
Server
public partial class IocpServer : Form { private delegate void SetRichTextBoxCallBack(string str); private SetRichTextBoxCallBack setRichTextBoxcallback; public IocpServer() { setRichTextBoxcallback = new SetRichTextBoxCallBack(SetRichTextBoxReceive); InitializeComponent(); } /// <summary> /// // 監聽Socket,用於接受客戶端的連接請求 /// </summary> Socket Socketlistener; /// <summary> /// // 用於伺服器執行的互斥同步對象 /// </summary> private static Mutex mutex = new Mutex(); //完成埠上進行投遞所用的IoContext對象池 //private IoContextPool ioContextPool; // /// <summary> /// 伺服器上連接的客戶端總數 /// </summary> private Int32 numConnectedSockets; /// <summary> /// 伺服器能接受的最大連接數量 /// </summary> private Int32 numConnections = 8192; /// <summary> /// 用於每個I/O Socket操作的緩衝區大小 /// </summary> private Int32 bufferSize = 4028; /// <summary> /// 埠 /// </summary> private Int32 bufferPort = Convert.ToInt32(ConfigurationManager.AppSettings["ServicePort"]); //ip private String _GetAddress = ConfigurationManager.AppSettings["ServiceAddress"]; //所有設備用戶信息 //private List<Equipment> ListInfo = new List<Equipment>(); /// <summary> /// 所有設備用戶信息 /// </summary> private List<ClientInformation> ListInfo = new List<ClientInformation>(); /// <summary> /// 輸出實體類 /// </summary> HttpDate Hdate = new HttpDate(); //完成埠上進行投遞所用的IoContext對象池 private IoContextPool ioContextPool; DateTime GetDate; TimeSpan UdpTime; string FileTxt = Application.StartupPath + @"\FileTxt"; string FileName ; private void IocpServer_Load(object sender, EventArgs e) { //獲取所有設備用戶信息 //ListInfo = AdoGetInfo.GetEquipmentUser(); GetDate = DateTime.Now.AddHours(-1); FileSave(); this.numConnectedSockets = 0; this.ioContextPool = new IoContextPool(numConnections); // 為IoContextPool預分配SocketAsyncEventArgs對象 for (Int32 i = 0; i < this.numConnections; i++) { SocketAsyncEventArgs ioContext = new SocketAsyncEventArgs(); ioContext.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted); ioContext.SetBuffer(new Byte[this.bufferSize], 0, this.bufferSize); // 將預分配的對象加入SocketAsyncEventArgs對象池中 this.ioContextPool.Add(ioContext); } // 獲得主機相關信息 IPAddress[] addressList = Dns.GetHostEntry(Environment.MachineName).AddressList; IPEndPoint localEndPoint = new IPEndPoint(addressList[addressList.Length - 1], bufferPort); // 創建監聽socket this.Socketlistener = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); this.Socketlistener.ReceiveBufferSize = this.bufferSize; this.Socketlistener.SendBufferSize = this.bufferSize; if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6) { // 配置監聽socket為 dual-mode (IPv4 & IPv6) // 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below, this.Socketlistener.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false); this.Socketlistener.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port)); } else { this.Socketlistener.Bind(localEndPoint); } // 開始監聽 this.Socketlistener.Listen(this.numConnections); // 在監聽Socket上投遞一個接受請求。 this.StartAccept(null); // Blocks the current thread to receive incoming messages. mutex.WaitOne(); rTBoxInformation.Invoke(setRichTextBoxcallback, "伺服器開始監聽"); } /// <summary> /// 監聽Socket接受處理 /// </summary> /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param> private void ProcessAccept(SocketAsyncEventArgs e) { Socket s = e.AcceptSocket; if (s.Connected) { try { SocketAsyncEventArgs ioContext = this.ioContextPool.Pop(); if (ioContext != null) { // 從接受的客戶端連接中取數據配置ioContext //ioContext.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted); //byte [] by=new Byte[]{}; //ioContext.SetBuffer(by, 0, by.Length); //ioContext.UserToken = s; // 從接受的客戶端連接中取數據配置ioContext ioContext.UserToken = s; Interlocked.Increment(ref this.numConnectedSockets); string outStr = String.Format("客戶 {0} 連入, 共有 {1} 個連接。", s.RemoteEndPoint.ToString(), this.numConnectedSockets); rTBoxInformation.Invoke(setRichTextBoxcallback, outStr); if (!s.ReceiveAsync(ioContext)) { this.ProcessReceive(ioContext); } } else //已經達到最大客戶連接數量,在這接受連接,發送“連接已經達到最大數”,然後斷開連接 { s.Send(Encoding.Default.GetBytes("連接已經達到最大數!")); string outStr = String.Format("連接已滿,拒絕 {0} 的連接。", s.RemoteEndPoint); rTBoxInformation.Invoke(setRichTextBoxcallback, outStr); s.Close(); } } catch (SocketException ex) { Socket token = e.UserToken as Socket; string outStr = String.Format("接收客戶 {0} 數據出錯, 異常信息: {1} 。", token.RemoteEndPoint, ex.ToString()); AdoInsertTemp.AddServerErrorLog("接收客戶數據出錯:[IcopServer代碼行號177]" + ex.Message); rTBoxInformation.Invoke(setRichTextBoxcallback, outStr); } catch (Exception ex) { rTBoxInformation.Invoke(setRichTextBoxcallback, ex.Message); AdoInsertTemp.AddServerErrorLog("監聽Socket接受處理:[IcopServer代碼行號182]" + ex.Message); } // 投遞下一個接受請求 this.StartAccept(e); } } /// <summary> /// 從客戶端開始接受一個連接操作 /// </summary> /// <param name="acceptEventArg">The context object to use when issuing /// the accept operation on the server's listening socket.</param> private void StartAccept(SocketAsyncEventArgs acceptEventArg) { if (acceptEventArg == null) { acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted); } else { // 重用前進行對象清理 acceptEventArg.AcceptSocket = null; } if (!this.Socketlistener.AcceptAsync(acceptEventArg)) { this.ProcessAccept(acceptEventArg); } } /// <summary> ///接收完成時處理函數 /// </summary> /// <param name="e">與接收完成操作相關聯的SocketAsyncEventArg對象</param> private void ProcessReceive(SocketAsyncEventArgs e) { // 檢查遠程主機是否關閉連接 if (e.BytesTransferred > 0) { if (e.SocketError == SocketError.Success) { Socket s = (Socket)e.UserToken; ClientInformation client = Hdate.AddClient(ListInfo, s.RemoteEndPoint.ToString(), e.BytesTransferred, e.Buffer); //判斷所有需接收的數據是否已經完成 if (s.Available == 0) { IPEndPoint localEp = s.RemoteEndPoint as IPEndPoint; // 設置發送數據 byte[] _endRead = new byte[client.Transferred]; bool isClose = false; client = Hdate.GetClient(ListInfo,client); string strText = client.Rend; //Encoding.UTF8.GetString(e.Buffer, 0, client.Transferred); RequestType REQUESTTYPE = Hdate.Request_Type(strText);//數據類型 RequestDeal REQUESTDEAL = Hdate.Request_Deal(strText);//命令方式 byte[] data = new byte[4028]; //初始化 if (REQUESTTYPE == RequestType.TypeGet && REQUESTDEAL == RequestDeal.GetConfiguration) { rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("[來自{0}]{1}", localEp, strText)); data = CommonMethod.GetSend(bufferSize, Encoding.ASCII.GetBytes(Hdate.RtrunhttpNew(OutputPrint.ResponseGetFromNew.Replace("[GetSN]", client.DevicesSn)))); e.SetBuffer(data, e.Offset, data.Length); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("向{0}發送:{1}", localEp, Encoding.UTF8.GetString(data))); } else if (REQUESTTYPE == RequestType.TypeGet && REQUESTDEAL == RequestDeal.GetInfo) { rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("[來自{0}]{1}", localEp, strText)); data = CommonMethod.GetSend(bufferSize, Encoding.ASCII.GetBytes(Hdate.RtrunhttpNew(OutputPrint.capsOk))); e.SetBuffer(data, e.Offset, data.Length); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("向{0}發送:{1}", localEp, Encoding.UTF8.GetString(data))); } //是否有命令發送 else if (REQUESTTYPE == RequestType.TypeGet && REQUESTDEAL == RequestDeal.GetOrders) { rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("[來自{0}]{1}", localEp, strText)); if (client.WaitingName != null) { if (client.WaitingName.Count > 0) { data = CommonMethod.GetSend(bufferSize, Encoding.ASCII.GetBytes(Hdate.RtrunhttpNew(client.WaitingName[0]))); } else { data = CommonMethod.GetSend(bufferSize, Encoding.ASCII.GetBytes(Hdate.RtrunhttpNew(OutputPrint.capsOk))); } } else { data = CommonMethod.GetSend(bufferSize, Encoding.ASCII.GetBytes(Hdate.RtrunhttpNew(OutputPrint.capsOk))); } e.SetBuffer(data, e.Offset, data.Length); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("向{0}發送:{1}", localEp, Encoding.UTF8.GetString(data))); } //返回值說明:0 命令執行成功-1 參數錯誤-3 存取錯誤 else if (REQUESTTYPE == RequestType.TypePOST && REQUESTDEAL == RequestDeal.PostInfo) { Hdate.RemoveLength(client, strText); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("[來自{0}]{1}", localEp, strText)); data = CommonMethod.GetSend(bufferSize, Encoding.ASCII.GetBytes(Hdate.RtrunhttpNew(OutputPrint.capsOk))); e.SetBuffer(data, e.Offset, data.Length); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("向{0}發送:{1}", localEp, Encoding.UTF8.GetString(data))); } //post發送數據命令 else if (REQUESTTYPE == RequestType.TypePOST && REQUESTDEAL == RequestDeal.PostAttTable) { Hdate.GetTable(strText, client); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("[來自{0}]{1}", localEp, strText)); data = CommonMethod.GetSend(bufferSize, Encoding.ASCII.GetBytes(Hdate.RtrunhttpNew(OutputPrint.capsOk))); e.SetBuffer(data, e.Offset, data.Length); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("向{0}發送:{1}", localEp, Encoding.UTF8.GetString(data))); } else { if (!client.IsData) { Hdate.RemoveLength(client, strText); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("[來自{0}]{1}", localEp, strText)); data = CommonMethod.GetSend(bufferSize, Encoding.ASCII.GetBytes(Hdate.RtrunhttpNew(OutputPrint.capsOk))); e.SetBuffer(data, e.Offset, data.Length); rTBoxInformation.Invoke(setRichTextBoxcallback, string.Format("向{0}發送:{1}", localEp, Encoding.UTF8.GetString(data))); } else { Hdate.GetTable(strText, client); } } try { if (!s.SendAsync(e)) //投遞發送請求,這個函數有可能同步發送出去,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件 { // 同步發送時處理髮送完成事件 this.ProcessSend(e, isClose); } if (client != null && !client.IsData) { Hdate.AddAtt(client, ListInfo); Thread.Sleep(6000); this.CloseClientSocket(s, e); } } catch (Exception ex) { AdoInsertTemp.AddServerErrorLog("接收完成時處理函數:[IcopServer代碼行號330]" + ex.Message); } } else if (!s.ReceiveAsync(e)) //為接收下一段數據,投遞接收請求,這個函數有可能同步完成,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件 { // 同步接收時處理接收完成事件 this.ProcessReceive(e); } } else { this.ProcessError(e); } } else { this.CloseClientSocket(e); } } /// <summary> /// 發送完成時處理函數 /// </summary> /// <param name="e">與發送完成操作相關聯的SocketAsyncEventArg對象</param> private void ProcessSend(SocketAsyncEventArgs e, bool isReceive) { try { if (e.SocketError == SocketError.Success) { Socket s = (Socket)e.UserToken; if (s != null) { //this.CloseClientSocket(s, e); //接收時根據接收的位元組數收縮了緩衝區的大小,因此投遞接收請求時,恢復緩衝區大小 //e.SetBuffer(new Byte[buffer_Size], 0, buffer_Size); e.SetBuffer(0, bufferSize); if (!s.ReceiveAsync(e)) //投遞接收請求 { // 同步接收時處理接收完成事件 this.ProcessReceive(e); } } } else { this.ProcessError(e); } } catch (Exception ex) { rTBoxInformation.Invoke(setRichTextBoxcallback, ex.Message); AdoInsertTemp.AddServerErrorLog("發送完成時處理函數:[IcopServer代碼行號390]" + ex.Message); this.ProcessError(e); } } /// <summary> /// 當Socket上的發送或接收請求被完成時,調用此函數 /// </summary> /// <param name="sender">激發事件的對象</param> /// <param name="e">與發送或接收完成操作相關聯的SocketAsyncEventArg對象</param> private void OnIOCompleted(object sender, SocketAsyncEventArgs e) { // Determine which type of operation just completed and call the associated handler. switch (e.LastOperation) { case SocketAsyncOperation.Receive: this.ProcessReceive(e); break; case SocketAsyncOperation.Send: this.ProcessSend(e,true); break; default: throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } } /// <summary> /// 處理socket錯誤 /// </summary> /// <param name="e"></param> private void ProcessError(SocketAsyncEventArgs e) { try { Socket s = e.UserToken as Socket; IPEndPoint localEp = s.LocalEndPoint as IPEndPoint; this.CloseClientSocket(s, e); string outStr = String.Format("套接字錯誤 {0}, IP {1}, 操作 {2}。", (Int32)e.SocketError, localEp, e.LastOperation); rTBoxInformation.Invoke(setRichTextBoxcallback, outStr); } catch (Exception ex) { AdoInsertTemp.AddServerErrorLog("處理socket錯誤:[IcopServer代碼行號431]" + ex.Message); } } /// <summary> /// 關閉socket連接 /// </summary> /// <param name="e">SocketAsyncEventArg associated with the completed send/receive operation.</param> private void CloseClientSocket(SocketAsyncEventArgs e) { Socket s = e.UserToken as Socket; this.CloseClientSocket(s, e); } /// <summary> /// accept 操作完成時回調函數 /// </summary> /// <param name="sender">Object who raised the event.</param> /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param> private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e) { this.ProcessAccept(e); } private void CloseClientSocket(Socket s, SocketAsyncEventArgs e) { try { if (s != null && this.numConnectedSockets > 0) { Interlocked.Decrement(ref this.numConnectedSockets); // SocketAsyncEventArg 對象被釋放,壓入可重用隊列。 this.ioContextPool.Push(e); string outStr = String.Format("客戶 {0} 斷開, 共有 {1} 個連接。", s.RemoteEndPoint.ToString(), this.numConnectedSockets); rTBoxInformation.Invoke(setRichTextBoxcallback, outStr); try { s.Shutdown(SocketShutdown.Send); s.Disconnect(true); } catch (Exception ex) { rTBoxInformation.Invoke(setRichTextBoxcallback, ex.Message); AdoInsertTemp.AddServerErrorLog("sokect關閉:[IcopServer代碼行號477]" + ex.Message); } finally { s.Close(); } } } catch (Exception ex) { AdoInsertTemp.AddServerErrorLog("sokect關閉:[IcopServer代碼行號467]" + ex.Message); } } private void SetRichTextBoxReceive(string str) { //show txt rTBoxInformation.AppendText(str); //do right rTBoxInformation.Select(this.rTBoxInformation.TextLength, 0); //do down rTBoxInformation.ScrollToCaret(); //new row rTBoxInformation.AppendText("\r\n"); FileSave(); } private void FileSave() { TimeSpan UdpTime=DateTime.Now-GetDate; if(UdpTime.Hours>=1) { FileStream fs = null; StreamWriter sw = null; FileName = FileTxt + DateTime.Now.ToString("yyyyMMddHH"); if(!File.Exists(FileName)) { Directory.CreateDirectory(FileName); } fs = new FileStream(FileName + @"\log_"+DateTime.Now.ToString("yyyyMMddHHmmssfff") + ".txt", FileMode.Create); sw = new StreamWriter(fs); sw.Write(rTBoxInformation.Text); sw.Close(); fs.Close(); rTBoxInformation.Clear(); GetDate = DateTime.Now; } } private void IocpServer_FormClosing(object sender, FormClosingEventArgs e) { e.Cancel = true; this.Hide(); } private void notifyIcon_MouseDoubleClick(object sender, MouseEventArgs e) { this.Show(); WindowState = FormWindowState.Normal; } }
更新中....