使用CBrother腳本做TCP伺服器與C++客戶端通信 工作中總是會遇到一些對於伺服器壓力不是特別大,但是代碼量比較多,用C++寫起來很不方便。對於這種需求,我選擇用CBrother腳本做伺服器,之所以不選擇Python是因為python的語法我實在是適應不了,再來CBrother的網路框架也... ...
使用CBrother腳本做TCP伺服器與C++客戶端通信
工作中總是會遇到一些對於伺服器壓力不是特別大,但是代碼量比較多,用C++寫起來很不方便。對於這種需求,我選擇用CBrother腳本做伺服器,之所以不選擇Python是因為python的語法我實在是適應不了,再來CBrother的網路框架也是用C++封裝的非同步IO,性能還是很有保證的。
廢話不多說,先來看下伺服器代碼,我這裡只是記錄一個例子,不是全部代碼,方便後面做項目的時候直接來自己博客複製代碼修改。
1 import CBSocket.code //載入Socket擴展 2 3 var g_tcpModule = null; //全局保存TCP模塊對象 4 5 const MSG_TYPE_USER = 1; //客戶端發來的消息 6 const MSG_TYPE_CLOSE = 2; //socket斷線的消息 7 8 const LOGIC_MSG_LOGIN = 1; //登陸消息全局定義 9 const LOGIC_MSG_GETID = 2; //獲取ID消息全局定義 10 11 function main(a) //入口函數,cbrother從這裡開始執行 12 { 13 var thread = new Thread(); //啟動一個數據管理線程,串列處理全局數據,可以根據不同業務啟動多個 14 thread.setThreadAction(new ThreadAction()); 15 thread.start(); 16 17 g_tcpModule = new TcpModule(); //啟動一個TCP模塊 18 var tcpAction = new TcpAction(); 19 tcpAction.thread = thread; 20 g_tcpModule.setTcpAction(tcpAction); //設置TCP的處理類為TcpAction 21 g_tcpModule.addListenPort(6061,"0.0.0.0"); //監聽6061埠 22 g_tcpModule.start(); 23 24 print "tcpServer start!"; 25 26 while(1) 27 { 28 Sleep(1000); 29 } 30 }
TcpAction主要處理tcpmodule的消息回調
1 class SocketBuf //這個類會給每個socket創建一個,用來拼包,因為tcp在傳輸過程中並不保證每次收到都是整包數據 2 { 3 var _byteArray = new ByteArray(1024 * 10); //每個socket預留10K的緩衝 4 5 function SocketBuf() 6 { 7 _byteArray.setLittleEndian(true); //c++編碼為低位編址(LE) 如果要高位編碼c++裡面可以htonl 8 } 9 10 function PushData(bytes,len) 11 { 12 print "pushdata " + len; 13 if(!_byteArray.writeBytes(bytes,len)) 14 { 15 print "socket buf is full!"; 16 return false; 17 } 18 19 return true; 20 } 21 22 function CheckPack() 23 { 24 print "begin CheckPack!"; 25 var writePos = _byteArray.getWritePos(); 26 print "checkpack " + writePos; 27 if(writePos < 4) //前4個位元組表示包的長度 28 { 29 print "CheckPack null < 4"; 30 return null; 31 } 32 33 var msglen = _byteArray.readInt(); 34 print "checkpack " + msglen; 35 if(writePos < msglen + 4) //緩存里的數據還不夠一包數據,繼續等待數據 36 { 37 print "CheckPack null writePos < msglen + 4"; 38 _byteArray.setReadPos(0); 39 return null; 40 } 41 42 //夠一包數據了,取出數據,到另一個線程里去處理 43 var newBytes = _byteArray.readBytes(msglen); 44 newBytes.setLittleEndian(true); 45 46 var readPos = _byteArray.getReadPos(); 47 48 _byteArray.copy(_byteArray,0,readPos,writePos - readPos); 49 _byteArray.setReadPos(0); 50 _byteArray.setWritePos(writePos - readPos); 51 print "writePos:" + writePos; 52 print "readPos:" + readPos; 53 //XORCode(newBytes); //異或解密一下,這個為了安全性考慮。我在另一篇博客里專門寫這個函數與C++加密函數的對應關係 54 return newBytes; 55 } 56 } 57 58 class TcpAction 59 { 60 var thread; //這個是邏輯線程,這個例子里我只啟動一個邏輯線程 61 var sockMap = new Map(); //保存每個socket的消息緩衝,拼包用 62 var lock = new Lock(); //sockMap的鎖 63 64 function OnAccept(sock) 65 { 66 print "accept " + sock + " " + g_tcpModule.getRemoteIP(sock); 67 68 //監聽到客戶端連接,管理起來 69 lock.lock(); 70 var socketbuf = new SocketBuf(); 71 sockMap.add(sock,socketbuf); 72 lock.unlock(); 73 } 74 75 function OnClose(sock) 76 { 77 print "onclose " + sock; 78 79 //斷線了,移除掉,並通知邏輯線程 80 lock.lock(); 81 sockMap.remove(sock); 82 lock.unlock(); 83 84 var newmsg = new ThreadMsg(sock,null); 85 newmsg.type = MSG_TYPE_CLOSE; 86 thread.addMsg(newmsg); 87 } 88 89 function OnRecv(sock,byteArray,len) 90 { 91 print "onrecv " + sock + " len:" + len; 92 93 //收到數據獲取socket緩衝 94 lock.lock(); 95 var socketbuf = sockMap.get(sock); 96 lock.unlock(); 97 98 if(socketbuf == null) 99 { 100 return; //應該是被關掉了 101 } 102 103 if(!socketbuf.PushData(byteArray,len)) //數據壓進去 104 { 105 g_tcpModule.closeSocket(sock); 106 return;//buf滿了都解不開包,說明數據有問題,關了它 107 } 108 109 //把包解出來丟到邏輯線程去處理,迴圈是因為怕buf里同時又好幾包數據 110 var newBytes = socketbuf.CheckPack(); 111 while(newBytes != null) 112 { 113 thread.addMsg(new ThreadMsg(sock,newBytes)); 114 newBytes = socketbuf.CheckPack(); 115 } 116 } 117 }
最後是邏輯線程里,其實上面的代碼寫好了基本上就不動了,後續添加消息號增加處理都是在邏輯線程里去,所以上面的代碼可以封裝一下,我這裡是為了自己好記憶,所以就這樣寫了。
1 //這個類是線程消息類,可以理解為一個結構體 2 class ThreadMsg 3 { 4 function ThreadMsg(id,bytes) 5 { 6 socketid = id; 7 byteArray = bytes; 8 type = MSG_TYPE_USER; 9 } 10 11 var socketid; 12 var type; 13 var byteArray; 14 } 15 16 class ThreadAction 17 { 18 var _userMap = new Map(); //用戶名索引用戶信息 19 var _socketMsp = new Map(); //Socket索引用戶信息 20 21 var _funcMap = new Map(); //消息對應的處理函數 22 23 function onInit() 24 { 25 print "thread init" ; 26 27 //線程啟動時讀取資料庫數據 因為篇幅問題不寫載入資料庫的部分了,固定插入兩條數據做模擬數據 28 //LoadDB(); 29 _userMap.add("zhangsan",new UserData("zhangsan","123123",1)); 30 _userMap.add("lisi",new UserData("lisi","321321",2)); 31 32 _funcMap.add(LOGIC_MSG_LOGIN,onLogin); //註冊消息號的處理函數 33 _funcMap.add(LOGIC_MSG_GETID,onGetID); 34 } 35 36 function onMsg(msg) 37 { 38 switch(msg.type) 39 { 40 case MSG_TYPE_CLOSE: 41 { 42 //斷線消息 43 print "MSG_TYPE_CLOSE"; 44 OnClose(msg.socketid); 45 break; 46 } 47 case MSG_TYPE_USER: 48 { 49 //客戶端發來的消息,客戶端發來的數據結構都是從struct MsgBase派生,所以前4個位元組就是struct MsgBase的msgid 50 var msgid = msg.byteArray.readInt(); 51 var func = _funcMap.get(msgid); 52 print "MSG_TYPE_USER" + msgid; 53 if(func != null) 54 { 55 func.invoke(msg.socketid,msg.byteArray); 56 } 57 break; 58 } 59 } 60 } 61 62 function onEnd() 63 { 64 print "thread end"; 65 } 66 67 function SendData(socketid,byteArray) //發送數據給客戶端的函數 68 { 69 //XORCode(byteArray); //異或加密一下 70 var lenByte = new ByteArray(); 71 lenByte.setLittleEndian(true); 72 lenByte.writeInt(byteArray.getWritePos()); 73 74 g_tcpModule.sendData(socketid,lenByte); 75 g_tcpModule.sendData(socketid,byteArray); 76 } 77 78 function OnClose(socketid) 79 { 80 //關閉的時候從socket管理里移除,清理線上狀態 81 var userdata = _socketMsp.get(socketid); 82 if(userdata == null) 83 { 84 return; 85 } 86 userdata._IsOnLine = false; 87 userdata._SocketID = 0; 88 _socketMsp.remove(socketid); 89 } 90 91 function onLogin(socketid,byteArray) 92 { 93 print "onLogin"; 94 95 var namebuf = byteArray.readBytes(32); //這個長度要跟C++結構體對應 char name[32]; 96 var name = namebuf.readString(); 97 var pwdbuf = byteArray.readBytes(32); //這個長度要跟C++結構體對應 char pwd[32]; 98 var pwd = pwdbuf.readString(); 99 100 print name; 101 print pwd; 102 103 var userdata = _userMap.get(name); 104 if(userdata == null) 105 { 106 //沒有找到用戶名,用戶名錯誤 107 var resBytes = new ByteArray(); 108 resBytes.writeInt(LOGIC_MSG_LOGIN); 109 resBytes.writeInt(1); //回覆1表示帳號或者密碼錯誤錯誤 110 SendData(socketid,resBytes); 111 print "name err!"; 112 return; 113 } 114 115 if(pwd != userdata._UserPwd) 116 { 117 var resBytes = new ByteArray(); 118 resBytes.writeInt(LOGIC_MSG_LOGIN); 119 resBytes.writeInt(1); //回覆1表示帳號或者密碼錯誤錯誤 120 SendData(socketid,resBytes); 121 print "pwd err!"; 122 return; 123 } 124 125 if(userdata._IsOnLine) //這個帳號已經登錄過了,衝掉 126 { 127 g_tcpModule.closeSocket(userdata._SocketID); 128 OnClose(userdata._SocketID); 129 } 130 131 //登陸成功,添加進socket管理,並至線上狀態 132 userdata._IsOnLine = true; 133 userdata._SocketID = socketid; 134 _socketMsp.add(socketid,userdata); 135 136 var resBytes = new ByteArray(); 137 resBytes.setLittleEndian(true); 138 resBytes.writeInt(LOGIC_MSG_LOGIN); 139 resBytes.writeInt(2); //回覆2表示登錄成功 140 SendData(socketid,resBytes); 141 print "login suc!"; 142 } 143 144 function onGetID(socketid,byteArray) 145 { 146 var userdata = _socketMsp.get(socketid); 147 if(userdata == null) 148 { 149 return; //該socket不線上,不處理 150 } 151 152 var resBytes = new ByteArray(); 153 resBytes.setLittleEndian(true); 154 resBytes.writeInt(LOGIC_MSG_GETID); 155 resBytes.writeInt(userdata._UserID); 156 SendData(socketid,resBytes); 157 } 158 }
伺服器代碼完成了,再來看看C++客戶端代碼。
1 enum 2 { 3 LOGIC_MSG_LOGIN = 1, //登陸 4 LOGIC_MSG_GETID = 2, //獲取ID 5 }; 6 7 struct MsgBase //消息基類 8 { 9 int msgid; 10 }; 11 12 struct MsgLogin : public MsgBase //登陸消息 13 { 14 char name[32]; 15 char pwd[32]; 16 }; 17 18 struct MsgLoginRet : public MsgBase //登陸返回 19 { 20 int res; 21 }; 22 23 struct MsgGetID : public MsgBase //獲取ID消息 24 { 25 }; 26 27 struct MsgGetIDRet : public MsgBase //獲取ID返回 28 { 29 int userid; 30 }; 31 32 //接收伺服器消息,將數據放到recvBuf里 33 bool RecvData(int sock,char* recvBuf) 34 { 35 int alllen = 0; 36 int len = recv(sock,recvBuf,4,0); //先讀4個位元組為消息長度 37 if (len <= 0) 38 { 39 return false; //socket被關閉了 40 } 41 42 alllen += len; 43 while (alllen < 4) 44 { 45 len = recv(sock,recvBuf + len,4 - len,0); 46 if (len <= 0) 47 { 48 return false; //socket被關閉了 49 } 50 51 alllen += len; 52 } 53 54 int msglen = *((int*)recvBuf); 55 56 //再將消息內容讀入recvBuf 57 alllen = 0; 58 len = recv(sock,recvBuf,msglen,0); 59 if (len <= 0) 60 { 61 return false; //socket被關閉了 62 } 63 64 alllen += len; 65 while (alllen < msglen) 66 { 67 len = recv(sock,recvBuf + len,msglen - len,0); 68 if (len <= 0) 69 { 70 return false; //socket被關閉了 71 } 72 73 alllen += len; 74 } 75 76 return true; 77 } 78 79 //發送數據 80 bool SendData(int sock,MsgBase* pbase,int len) 81 { 82 //XORBuf((char*)pbase,sizeof(len)); //異或加密,下一篇博客專門寫這個函數 83 84 send(sock,(const char*)&len,4,0); //發送長度 85 send(sock,(const char*)pbase,len,0); //發送數據 86 return true; 87 } 88 89 int _tmain(int argc, _TCHAR* argv[]) 90 { 91 WORD sockVersion = MAKEWORD(2, 2); 92 WSADATA wsaData; 93 if (WSAStartup(sockVersion, &wsaData) != 0) 94 { 95 return 0; 96 } 97 98 int clinetsocket = socket(PF_INET, SOCK_STREAM, 0); 99 if (clinetsocket == -1) 100 { 101 return 0; 102 } 103 104 struct hostent *hptr = gethostbyname("127.0.0.1"); 105 106 struct sockaddr_in address; 107 address.sin_family = AF_INET; 108 address.sin_addr.s_addr = *(u_long*)hptr->h_addr_list[0]; 109 address.sin_port = htons(6061); 110 111 int result = connect(clinetsocket, (struct sockaddr *)&address, sizeof(address)); 112 if (result == -1) 113 { 114 return NULL; 115 } 116 117 //定義登陸消息結構 118 MsgLogin msg; 119 msg.msgid = LOGIC_MSG_LOGIN; 120 strcpy(msg.name,"lisi"); 121 strcpy(msg.pwd,"321321"); 122 int len = sizeof(msg); 123 SendData(clinetsocket,&msg,len); 124 125 char recvBuf[1024] = {0}; 126 127 while (true) 128 { 129 if(!RecvData(clinetsocket,recvBuf)) 130 { 131 printf("socket close!\n"); //連接斷了 132 break; 133 } 134 135 //收到的數據先轉為pBase 看前4個位元組的msgid 136 MsgBase* pBase = (MsgBase*)recvBuf; 137 138 switch (pBase->msgid) 139 { 140 case LOGIC_MSG_LOGIN: 141 { 142 //登陸返回 143 MsgLoginRet* mlr = (MsgLoginRet*)pBase; 144 if (mlr->res == 1) 145 { 146 printf("login err!\n"); 147 } 148 else 149 { 150 printf("login suc!\n"); 151 152 //請求ID 153 MsgGetID msggetid; 154 msggetid.msgid = LOGIC_MSG_GETID; 155 len = sizeof(msggetid); 156 SendData(clinetsocket,&msggetid,len); 157 } 158 break; 159 } 160 case LOGIC_MSG_GETID: 161 { 162 //請求ID返回 163 MsgGetIDRet* mgir = (MsgGetIDRet*)pBase; 164 printf("userid : %d\n",mgir->userid); 165 break; 166 } 167 } 168 } 169 170 return 0; 171 }
這樣客戶端和伺服器就都完成了,下麵再來記錄一下C++消息結構序列化後的二進位流。
MsgBase為所有消息的基類,所以從它派生的結構體前4個位元組肯定是整形的msgid。在服務端直接readInt讀取前4個位元組就表示讀取了MsgBase里的msgid。
MsgLogin有兩個成員變數都為char[32]數組,所以這個結構體的總位元組大小是64,除掉前4個位元組是msgid意外,readBytes(32)表示讀取這個數組,再readString表示獲取\0結尾的字元串
MsgLoginRet只有一個成員變數,所以伺服器第一個writeInt表示填充基類的msgid,第二個writeInt表示res。
之後的邏輯就都是添加消息號和消息結構做邏輯了,用腳本做伺服器編碼效率還是非常高的。