高性能TcpServer - 1.網路通信協議 高性能TcpServer - 2.創建高性能Socket伺服器SocketAsyncEventArgs的實現(IOCP) 高性能TcpServer - 3.命令通道(處理:掉包,粘包,垃圾包) 高性能TcpServer - 4.文件通道(處理:文件分包 ...
高性能TcpServer(C#) - 2.創建高性能Socket伺服器SocketAsyncEventArgs的實現(IOCP)
高性能TcpServer(C#) - 3.命令通道(處理:掉包,粘包,垃圾包)
高性能TcpServer(C#) - 4.文件通道(處理:文件分包,支持斷點續傳)
代碼解析
SocketAsyncEventArgs對象管理 -- 用於CheckOut/CheckIn SocketAsyncEventArgs對象
SocketArgsPool socketArgsPool = new SocketArgsPool(MAX_CLIENTCOUNT);
this.m_EventArgs = this.m_socketArgsPool.CheckOut();// 初始化對象
this.m_bufferPool.CheckIn(m_EventArgs);// 回收對象
SocketArgsBufferPool對象管理 -- 用於CheckOut/CheckIn SocketAsyncEventArgs的Buffer
SocketArgsBufferPool bufferPool = new SocketArgsBufferPool(MAX_CLIENTCOUNT, MAX_CLIENTBUFFERSIZE);
this.m_bufferPool.CheckOut(this.m_EventArgs);// 設置setBuffer
this.m_bufferPool.CheckIn(m_EventArgs);// 回收對象
SocketEntityPool對象管理 -- 用於CheckOut/CheckIn SocketEntity
SocketEntityPool socketEntityPool = new SocketEntityPool(MAX_CLIENTCOUNT, MAX_CLIENTBUFFERSIZE);// 初始化
m_socketEntity = this.m_socketEntityPool.CheckOut();
m_socketEntity.SocketClient = socket;
m_bufferRecv = m_socketEntity.BufferRecv; m_bufferRecv.Clear();// 每個client的接收緩衝區
m_handle = m_socketEntity.ProtocolHandle;// 每個client的處理類
m_analysis = m_socketEntity.ProtocolAnalysis;// 每個client的解析類
this.m_socketEntityPool.CheckIn(socketEntity);// 回收對象
部分代碼
伺服器監聽和接收客戶端連接
public void Start(int port)
{
IPEndPoint ipEP = new IPEndPoint(IPAddress.Any, port);
this.m_listenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.m_listenerSocket.Bind(ipEP);
this.m_listenerSocket.Listen(100);
ListenForConnection(m_listenerArgs);
}
void ListenForConnection(SocketAsyncEventArgs args)
{
lock (this)
{
args.AcceptSocket = null;
m_listenerSocket.InvokeAsyncMethod(new SocketAsyncMethod(m_listenerSocket.AcceptAsync), AcceptAsyncCompleted, args);
}
}
void AcceptAsyncCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.OperationAborted)
{
CLogHelp.AppendLog("[Error] AcceptAsyncCompleted:SocketError.OperationAborted");
return; //Server was stopped
}
if (e.SocketError == SocketError.Success)
{
Socket acceptSocket = e.AcceptSocket;
if (acceptSocket != null)
{
if (connections + 1 <= MAX_CLIENTCOUNT)
{
IPEndPoint clientEP = (IPEndPoint)acceptSocket.RemoteEndPoint;
sn = String.Format("{0}:{1}", clientEP.Address.ToString(), clientEP.Port);
lock (LockIndex)
{
connections = Interlocked.Increment(ref connections);
Program.AddMessage("已連接,sn:" + sn + ",當前連接數:" + CServerIntance.connections.ToString());
}
CSocketDAO socketDao = new CSocketDAO(socketArgsPool, bufferPool, socketEntityPool, acceptSocket, sn);
CSingleton<CClientMgr>.GetInstance().AddOnlineClient(socketDao);
}
else
{
Program.AddMessage("超過最大連接數:" + MAX_CLIENTCOUNT.ToString() + ",拒接連接");
}
}
}
//continue to accept!
ListenForConnection(e);
}
伺服器數據處理
void ReceiveAsyncCompleted(object sender, SocketAsyncEventArgs e)
{
if (!this.m_connected) return;
try
{
m_EventArgs = e;
if (m_EventArgs.BytesTransferred == 0)
{
SocketCatchError("BytesTransferred=0"); //Graceful disconnect
return;
}
if (m_EventArgs.SocketError != SocketError.Success)
{
SocketCatchError("SocketError=" + (e.SocketError).ToString()); //NOT graceful disconnect
return;
}
//數據存儲
recvTime = DateTime.Now;
m_bufferRecv.Put(e);
m_analysis.BagStatus = CProtocolAnalysis.EBagStatus.BagNone;
// 粘包處理
while (m_bufferRecv.HasRemaining())
{
// 掉包處理
if (CProtocolAnalysis.EBagStatus.BagLost == m_analysis.BagStatus) break;
m_handle.Process(m_bufferRecv, m_analysis, m_strSn);// 數據解析(垃圾包處理)
if (string.IsNullOrEmpty(m_strUid))
{
if (!string.IsNullOrEmpty(m_analysis.Uid))
{
m_strUid = m_analysis.Uid;
CSingleton<CClientMgr>.GetInstance().AddClientUid(m_strUid, m_strSn, this);
}
}
if (m_analysis.WhetherToSend)
{
string data = CProtocolBase.GetProtocol(m_analysis);
SendRealTime(data);
}
}
ListenForData(e);
}
catch (Exception ex)
{
CLogHelp.AppendLog("[Error] ReceiveAsyncCompleted,errmsg:" + ex.Message);
}
}