版權聲明 版權聲明:原創文章 禁止轉載 請通過右側公告中的“聯繫郵箱([email protected])”聯繫我 勿用於學術性引用。 勿用於商業出版、商業印刷、商業引用以及其他商業用途。 本文不定期修正完善。 本文鏈接:http://www.cnblogs.com/wlsandwho/p/ ...
=================================版權聲明=================================
版權聲明:原創文章 禁止轉載
請通過右側公告中的“聯繫郵箱([email protected])”聯繫我
勿用於學術性引用。
勿用於商業出版、商業印刷、商業引用以及其他商業用途。
本文不定期修正完善。
本文鏈接:http://www.cnblogs.com/wlsandwho/p/5427091.html
恥辱牆:http://www.cnblogs.com/wlsandwho/p/4206472.html
=======================================================================
先放個草稿。有空寫寫,沒空就算了。
=======================================================================
簡單的做了幾個類。
把Socket部分、IOCP部分、線程部分分割開。便於實現自己想要的樣子。實現了ACCEPT、RECV的自定義處理。
沒有寫SEND部分。
=======================================================================
頭文件
1 #pragma once 2 3 //SIT--Socket,IOCP,Thread Pool 4 //by wls 5 //20160419 6 #include <list> 7 8 #include <atlstr.h> 9 10 #include <winsock2.h> 11 #include <MSWSock.h> 12 #include <Ws2tcpip.h> 13 #pragma comment(lib,"ws2_32.lib") 14 15 #include <functional> 16 #include <memory> 17 //緩衝區最大大小 18 static const int BUFFER_SIZE_MAX = 3096; 19 20 class CPerIOContextBase; 21 class CPerSocketContextBase; 22 class CSITContextBase; 23 class CSocketNetBase; 24 class CIOCPBase; 25 class CThreadpoolBase; 26 class CSITBase; 27 28 //操作類型 29 typedef enum _OPERATION_TYPE 30 { 31 NULL_POSTED=0, // 用於初始化,無意義 32 ACCEPT_POSTED, // 標誌投遞的Accept操作 33 SEND_POSTED, // 標誌投遞的是發送操作 34 RECV_POSTED // 標誌投遞的是接收操作 35 }OPERATION_TYPE; 36 37 //工作線程單元 38 typedef struct _WorkerThreadUnit 39 { 40 CSITBase* pSIT; //SIT指針 41 DWORD dwThreadID; //線程ID 42 }WorkerThreadUnit, *PWorkerThreadUnit; 43 44 //SIT 45 class CSITBase 46 { 47 public: 48 virtual bool Init(CIOCPBase* pIOCP, CThreadpoolBase* pThreadpool, CSocketNetBase* pSocketSocket,CSITContextBase* pSITContext); 49 public: 50 virtual bool AssociateSocketNetWithIOCP(CPerSocketContextBase* pSocketContext); 51 virtual bool HandleError(CPerSocketContextBase* pSocketContext, DWORD dwError, DWORD dwThreadID); 52 virtual bool PostAccept(CPerIOContextBase* pAcceptContext); 53 virtual bool DoAccept(CPerSocketContextBase* pSocketContext, CPerIOContextBase* pIOContext); 54 virtual bool DoRecv(CPerSocketContextBase* pSocketContext, CPerIOContextBase* pIOContext); 55 virtual void AddSocketContext(CPerSocketContextBase* pSocketContext); 56 virtual void RemoveSocketContext(CPerSocketContextBase* pSocketContext); 57 public: 58 CIOCPBase* GetIOCP(); 59 CSITContextBase* GetSITContext(); 60 CThreadpoolBase* GetThreadpool(); 61 CSocketNetBase* GetSocketNet(); 62 HANDLE GetIOCPObj(); 63 protected: 64 CIOCPBase* m_pIOCP; 65 CThreadpoolBase* m_pThreadpool; 66 CSocketNetBase* m_pSocketNet; 67 CSITContextBase* m_pSITContext; 68 }; 69 70 //每次IO要用到的結構 71 class CPerIOContextBase 72 { 73 public: 74 CPerIOContextBase(); 75 ~CPerIOContextBase(); 76 77 public: 78 //重置緩衝區 79 void ResetBuffer(); 80 81 public: 82 //位於首部 83 OVERLAPPED overlapped; 84 //套接字 85 SOCKET socket; 86 //WSABUF結構,其buf欄位指向szBuffer 87 WSABUF wsaBuf; 88 //實際緩衝區 89 char szBuffer[BUFFER_SIZE_MAX]; 90 //本次IO操作的類型 91 OPERATION_TYPE operationtype; 92 }; 93 //每個Socket要用到的結構 94 class CPerSocketContextBase 95 { 96 public: 97 CPerSocketContextBase(); 98 ~CPerSocketContextBase(); 99 public: 100 //獲得一個新的單次IO上下文 101 virtual CPerIOContextBase* GetNewPerIOContext(); 102 //刪除指定的單次IO上下文 103 virtual void RemovePerIOContext(CPerIOContextBase* pPerIOContext); 104 public: 105 SOCKET socket; 106 SOCKADDR_IN socketaddr; 107 std::list<CPerIOContextBase*> listPerIOContext;//該套接字中用到的所有的單次IO上下文都放在這裡 108 }; 109 110 //容納所有的CPerSocketContextBase 111 //其中每一個CPerSocketContextBase中又容納了它所對應的所有的CPerIOContextBase 112 class CSITContextBase 113 { 114 public: 115 virtual bool Init(); 116 public: 117 virtual void AddSocketContext(CPerSocketContextBase* pSocketContext); 118 virtual void RemoveSocketContext(CPerSocketContextBase* pSocketContext); 119 protected: 120 CRITICAL_SECTION m_cs; 121 std::list<CPerSocketContextBase*> m_listPerSocketContext; 122 }; 123 124 //IOCP 125 class CIOCPBase 126 { 127 public: 128 virtual bool Init(CSITBase* pSIT); 129 HANDLE Associate(HANDLE hFileHandle, HANDLE ExistingCompletionPort, ULONG_PTR CompletionKey); 130 virtual bool HandleError(CPerSocketContextBase* pSocketContext,DWORD dwError,DWORD dwThreadID); 131 132 protected: 133 HANDLE Create(int nConcurrentThread = 0); 134 135 public: 136 HANDLE GetIOCPObj(); 137 CSITBase* GetSIT(); 138 139 protected: 140 HANDLE m_hIOCompletionPort; 141 CSITBase* m_pSIT; 142 }; 143 144 //用戶操作 145 class CUserOperationBase 146 { 147 public: 148 virtual bool DoAccept(LPVOID pVoid); 149 virtual bool DoRecv(LPVOID pVoid); 150 virtual bool DoPost(LPVOID pVoid); 151 }; 152 153 154 //線程池 155 class CThreadpoolBase 156 { 157 public: 158 CThreadpoolBase(); 159 ~CThreadpoolBase(); 160 public: 161 virtual bool Init(CSITBase* pSIT); 162 163 protected: 164 //獲得合理線程總數 165 virtual int GetNumOfThreads(int nAdjustment = 2); 166 167 protected: 168 //線程工作函數 169 static DWORD WINAPI WorkerThread(LPVOID lpParam); 170 171 protected: 172 HANDLE* m_phWorkerThreads; 173 int m_NumOfThreads; 174 175 protected: 176 177 CSITBase* m_pSIT; 178 }; 179 180 //網路套接字 181 class CSocketNetBase 182 { 183 public: 184 //用戶操作函數 185 typedef std::function<bool(LPVOID)> FuncUserOperation; 186 187 //設置用戶操作 188 virtual bool SetUserOperation(OPERATION_TYPE eOperationType,FuncUserOperation pFunc); 189 public: 190 CSocketNetBase(); 191 192 public: 193 static bool IsAlive(SOCKET& s); 194 195 public: 196 virtual bool Init(CSITBase* pSIT); 197 public: 198 virtual bool PostAccept(CPerIOContextBase* pAcceptContext); 199 virtual bool DoAccept(CPerSocketContextBase* pSocketContext, CPerIOContextBase* pIOContext); 200 201 virtual bool PostRecv(CPerIOContextBase* pIOContext); 202 virtual bool DoRecv(CPerSocketContextBase* pSocketContext, CPerIOContextBase* pIOContext); 203 204 protected: 205 virtual bool AssociateIOCP(); 206 207 virtual SOCKET CreateServerSocket(); 208 virtual SOCKET BindServerSocket(); 209 virtual int ListenServerSocket(); 210 virtual bool AcceptConnection(int nPreparation); 211 virtual bool AcceptNewClient(CPerIOContextBase* pAcceptContext); 212 virtual SOCKET CreateClientSocket(); 213 virtual bool InitSocketLibrary(); 214 virtual bool SetLocalIP(); 215 216 protected: 217 virtual LPFN_ACCEPTEX GetFuncPtrAcceptEx(); 218 virtual LPFN_GETACCEPTEXSOCKADDRS GetFuncPtrGetAcceptExSockAddrs(); 219 protected: 220 CString m_strServerIP; 221 int m_nPort; 222 struct sockaddr_in m_ServerAddress; 223 224 CPerSocketContextBase* m_pListenContext; 225 226 CSITBase* m_pSIT; 227 228 FuncUserOperation m_funcUserOperation_ACCEPT; 229 FuncUserOperation m_funcUserOperation_RECV; 230 FuncUserOperation m_funcUserOperation_POST; 231 232 LPFN_ACCEPTEX m_lpfnAcceptEx; 233 LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs; 234 };
實現文件
1 #include "stdafx.h" 2 #include "SIT.h" 3 #include "vld.h" 4 //獲得錯誤文本信息 5 CString GetLastErrorMessage(DWORD dwLastError) 6 { 7 LPTSTR lpMsgBuf = NULL; 8 9 FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 10 NULL, 11 dwLastError, 12 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), 13 (LPTSTR)&lpMsgBuf, 14 0, 15 NULL); 16 17 CString strMsg(lpMsgBuf); 18 19 LocalFree(lpMsgBuf); 20 21 return strMsg; 22 } 23 24 bool CSITBase::Init(CIOCPBase* pIOCP, CThreadpoolBase* pThreadpool, CSocketNetBase* pSocketSocket,CSITContextBase* pSITContext) 25 { 26 if (pIOCP && pThreadpool && pSocketSocket && pSITContext) 27 { 28 m_pIOCP = pIOCP; 29 m_pThreadpool = pThreadpool; 30 m_pSocketNet = pSocketSocket; 31 m_pSITContext = pSITContext; 32 } 33 else 34 { 35 OutputDebugString(TEXT("CSITBase的Init參數不正確")); 36 37 return false; 38 } 39 40 return pSITContext->Init()&& 41 pIOCP->Init(this)&& 42 pThreadpool->Init(this) && 43 pSocketSocket->Init(this); 44 45 } 46 47 bool CSITBase::AssociateSocketNetWithIOCP(CPerSocketContextBase* pSocketContext) 48 { 49 if (GetIOCPObj()!= m_pIOCP->Associate((HANDLE)(pSocketContext->socket), GetIOCPObj(),(ULONG_PTR)pSocketContext)) 50 { 51 OutputDebugString(TEXT("監聽套接字關聯IOCP失敗: ") + GetLastErrorMessage(GetLastError()) + TEXT("\n")); 52 53 return false; 54 } 55 56 return true; 57 } 58 59 bool CSITBase::HandleError(CPerSocketContextBase* pSocketContext, DWORD dwError, DWORD dwThreadID) 60 { 61 return m_pIOCP->HandleError(pSocketContext, dwError, dwThreadID); 62 } 63 64 bool CSITBase::PostAccept(CPerIOContextBase* pAcceptContext) 65 { 66 return m_pSocketNet->PostAccept(pAcceptContext); 67 } 68 69 bool CSITBase::DoAccept(CPerSocketContextBase* pSocketContext, CPerIOContextBase* pIOContext) 70 { 71 return m_pSocketNet->DoAccept(pSocketContext, pIOContext); 72 } 73 74 bool CSITBase::DoRecv(CPerSocketContextBase* pSocketContext, CPerIOContextBase* pIOContext) 75 { 76 return m_pSocketNet->DoRecv(pSocketContext, pIOContext); 77 } 78 79 void CSITBase::AddSocketContext(CPerSocketContextBase* pSocketContext) 80 { 81 m_pSITContext->AddSocketContext(pSocketContext); 82 } 83 84 void CSITBase::RemoveSocketContext(CPerSocketContextBase* pSocketContext) 85 { 86 m_pSITContext->RemoveSocketContext(pSocketContext); 87 } 88 89 CIOCPBase* CSITBase::GetIOCP() 90 { 91 return m_pIOCP; 92 } 93 94 CSITContextBase* CSITBase::GetSITContext() 95 { 96 return m_pSITContext; 97 } 98 99 CThreadpoolBase* CSITBase::GetThreadpool() 100 { 101 return m_pThreadpool; 102 } 103 104 CSocketNetBase* CSITBase::GetSocketNet() 105 { 106 return m_pSocketNet; 107 } 108 109 HANDLE CSITBase::GetIOCPObj() 110 { 111 return m_pIOCP->GetIOCPObj(); 112 } 113 114 CThreadpoolBase::CThreadpoolBase() 115 { 116 m_NumOfThreads = 0; 117 m_phWorkerThreads = NULL; 118 m_pSIT = NULL; 119 } 120 121 CThreadpoolBase::~CThreadpoolBase() 122 { 123 for (int i = 0; i < m_NumOfThreads;i++) 124 { 125 CloseHandle(m_phWorkerThreads[i]); 126 } 127 128 delete[] m_phWorkerThreads; 129 } 130 131 bool CThreadpoolBase::Init(CSITBase* pSIT) 132 { 133 if (pSIT->GetIOCP()==NULL) 134 { 135 OutputDebugString(TEXT("SIT缺少IOCP")); 136 137 return false; 138 } 139 140 m_pSIT = pSIT; 141 142 m_NumOfThreads = GetNumOfThreads(); 143 if (m_NumOfThreads<0) 144 { 145 OutputDebugString(TEXT("工作者線程數量小於0")); 146 147 return false; 148 } 149 150 m_phWorkerThreads = new HANDLE[m_NumOfThreads]; 151 152 for (int i = 0; i < m_NumOfThreads;i++) 153 { 154 PWorkerThreadUnit pWTU = new WorkerThreadUnit; 155 pWTU->pSIT = m_pSIT; 156 m_phWorkerThreads[i] = CreateThread(0, 0, WorkerThread, (LPVOID)pWTU, 0, &(pWTU->dwThreadID)); 157 } 158 159 OutputDebugString(TEXT("線程初始化完畢")); 160 161 return true; 162 } 163 164 int CThreadpoolBase::GetNumOfThreads(int nAdjustment /*= 2*/) 165 { 166 SYSTEM_INFO si; 167 GetSystemInfo(&si); 168 169 return si.dwNumberOfProcessors * 2 + nAdjustment; 170 } 171 172 DWORD WINAPI CThreadpoolBase::WorkerThread(LPVOID lpParam) 173 { 174 PWorkerThreadUnit pWTU = (PWorkerThreadUnit)lpParam; 175 CSITBase* pSIT = pWTU->pSIT; 176 DWORD dwThreadID = pWTU->dwThreadID; 177 178 CString strThreadInfo; 179 strThreadInfo.Format(TEXT("工作者線程 %d"), (int)dwThreadID); 180 OutputDebugString(strThreadInfo+TEXT("\n")); 181 182 BOOL bRes=FALSE; 183 DWORD dwTransfered = 0; 184 CPerSocketContextBase* pSocketContext=NULL; 185 OVERLAPPED* pOverlapped = NULL; 186 187 while (true) 188 { 189 bRes = GetQueuedCompletionStatus(pSIT->GetIOCPObj(), 190 &dwTransfered, 191 (PULONG_PTR)&pSocketContext, 192 &pOverlapped, 193 INFINITE); 194 195 if ((DWORD)pSocketContext==0) 196 { 197 OutputDebugString(TEXT("收到PostQueuedCompletionStatus調用,要求線程退出\n")); 198 break; 199 } 200 201 if (!bRes) 202 { 203 DWORD dwError = GetLastError(); 204 OutputDebugString(strThreadInfo + CString(TEXT(" 調用GetQueuedCompletionStatus失敗,") + GetLastErrorMessage(dwError))+TEXT("\n")); 205 206 if (!pSIT->HandleError(pSocketContext,dwError,dwThreadID)) 207