Gobelieve 架構 Gobelieve github地址 im 客戶連接伺服器 (可分散式部署,暫無負載均衡模塊) imr 路由查詢伺服器(主要解決im分散式部署的問題) ims 存儲伺服器 (主從部署) 基礎模塊 1.數據包協議 包:header(12)|body header:len(4) ...
Gobelieve 架構
im 客戶連接伺服器 (可分散式部署,暫無負載均衡模塊)
imr 路由查詢伺服器(主要解決im分散式部署的問題)
ims 存儲伺服器 (主從部署)
基礎模塊
1.數據包協議
包:header(12)|body header:len(4),seq(4),cmd(1),version(1),空(2)
2.數據收發流程
accept收到一個連接
開啟寫線程和讀線程
寫線程:監聽client.wt阻塞隊列,一有數據就寫入conn
讀線程:按照數據包協議從conn讀出數據包,由client.HandleMessage處理
3.幾個方法
PushMessage 通過route_channel 發送 MSG_PUBLISH 給IMR PushGroupMessage 通過route_channel 發送 MSG_PUBLISH_GROUP 給IMR SaveMessage 通過IMS RPC服務 調用SavePeerMessage SaveGroupMessage 通過IMS RPC服務 調用SaveGroupMessage im_client.SendMessage 1.PushMessage 2.本地路由查詢 並EnqueueMessage im_client.SendGroupMessage 1.PushGroupMessage 2.group_manager查詢group 3.由group得到所有menber,對每個menber查詢路由表,並EnqueueMessage im_client.EnqueueMessage 將數據寫入client.wt,供發送出去
IM 模塊
IM模塊初始化
1.redis_pool
2.storage_pools 連接ims:3333
http伺服器讀取最近消息時調用
3.rpc_clients ims:13333
SyncMessage
SyncGroupMessage
SavePeerMessage
SaveGroupMessage
4.group_rpc_clients (可選)
5.route_channels 連接imr:4444
開啟讀寫線程
寫:從channel.wt管道取值併發送給imr
讀:從imr接受消息,並分發給當前im節點連接用戶
6.group_manager
1.load: 從mysql載入group,保存至 group_manager.groups 2.run: reids訂閱 case group_create、 group_disband、 group_member_add、 group_member_remove、 group_upgrade、 回調處理 增刪改查group_manager.groups case ping 臟數據檢測 3.ping: 每個五分鐘發送ping
7.group_message_deliver:普通群消息分發
1.init:創建本地存儲文件 2.run: 監聽wt管道,有數據表示有新消息寫入文件 讀取文件併發送
8.ListenRedis 禁言
redis訂閱 speak_forbidden
接受事件推送,從本地路由查詢到對應client,修改forbidden欄位。
9.SyncKeyService
從 group_sync_c 和 sync_c 管道取值,保存至redis
( 客戶端發送的 MSG_GROUP_SYNC_KEY 和 MSG_SYNC_KEY 消息會將消息內同步key寫入對應的管道 group_sync_c 和 sync_c)
10.StartHttpServer :6666
web伺服器
11.StartSocketIO :websocket
12.ListenClient :23000 處理客戶端連接
ListenClient 處理流程
1.登錄認證 cmd:MSG_AUTH_TOKEN
客戶端將uid與token傳給伺服器,由redis_pool查詢認證 認證成功: 1.由EnqueueMessage發送消息{cmd:MSG_AUTH_STATUS,status:0} 2.client.AddClient() 緩存本連接到本機路由表 3.client.IMClient.Login() 緩存本鏈接到IMR路由表 認證失敗: 由EnqueueMessage發送消息{cmd:MSG_AUTH_STATUS,status:1}
2.IMClient 處理消息類型
MSG_IM: 處理IM 同步消息
MSG_GROUP_IM: 處理Group 同步消息
MSG_INPUTING: 處理Inputing消息
MSG_RT: 處理實時消息
MSG_UNREAD_COUNT: 設置未讀消息數
MSG_SYNC: 客戶端請求同步最新消息
MSG_SYNC_KEY: 客戶端將SYNC_KEY 傳至服務端
MSG_SYNC_GROUP: 客戶端請求同步最新群消息
MSG_GROUP_SYNC_KEY: 客戶端將GROUP_SYNC_KEY 傳至服務端
RoomClient 消息類型
MSG_ENTER_ROOM:進入聊天室
MSG_LEAVE_ROOM:離開聊天室
MSG_ROOM_IM:聊天室IM消息
VOIPClient消息類型
MSG_VOIP_CONTROL: VOIP命令
CustomerClient消息類型
MSG_CUSTOMER: 顧客->客服
MSG_CUSTOMER_SUPPORT:客服->顧客
3.MSG_IM處理流程:
用戶A -> B
1.SaveMessage:保存消息到目標用戶B存儲隊列 (rpc->SavePeerMessage) 2.SaveMessage:保存消息到發送用戶A存儲隊列(供多點登錄同步消息) 3.PushMessage:外部推送消息給目標用戶B(由IMR尋路由)MSG_IM 4.SendMessage:發送同步消息給目標用戶B (外部推送+本地定址發送) MSG_SYNC_NOTIFY 5.SendMessage:發送同步消息給發送用戶A(多點登錄)MSG_SYNC_NOTIFY 6.EnqueueMessage:給本連接回覆MSG_ACK消息
4.MSG_GROUP_IM處理流程
1.由group_manager查詢到指定group 2.根據Group類型: 1.HandleSuperGroupMessage: SaveGroupMessage:保存MSG_GROUP_IM消息 (rpc->SaveGroupMessage) PushGroupMessage:外部推送群消息 MSG_GROUP_IM SendGroupMessage:發送群同步通知消息(外部推送+本地定址推送) MSG_SYNC_GROUP_NOTIFY 2.HandleGroupMessage: group_message_deliver: saveMessage:本地保存消息 MSG_PENDING_GROUP_MESSAGE ReadMessage:讀取消息 MSG_PENDING_GROUP_MESSAGE 對群每個成員:SaveMessage:保存MSG_GROUP_IM消息 PushMessage:外部推送消息 MSG_GROUP_IM SendMessage:發送同步消息 MSG_SYNC_NOTIFY 3.EnqueueMessage:給本連接回覆MSG_ACK消息
5.MSG_INPUTING:
SendMessage:發送給目標用戶
6.MSG_RT 實時消息處理流程
SendMessage:發送消息給目標用戶
7.MSG_UNREAD_COUNT 設置用戶未讀數:
由redis_pool操作 hashkey:users_$appid_$uid field:unread
8.消息同步流程:
服務端->客戶端 MSG_SYNC_NOTIFY: 客戶端: 1.isSyncing==false:sendSync 發送舊syncKey,請求消息MSG_SYNC,狀態切換為同步狀態 2.isSyncing==true: 同步狀態中,新的newSyncKey 保存在pendingSyncKey中,this.pendingSyncKey = newSyncKey; 客戶端->服務端:MSG_SYNC 服務端: 1.從客戶端傳來的sync_key得到last_id,(如果last_id==0,從redis取出最新sync_key) 2.rpc->SyncMessage:根據last_id取出緩存的最近消息msgs 3.EnqueueMessage:發送MSG_SYNC_BEGIN消息 (客戶端不做處理) 4.EnqueueMessage:迴圈發送msgs 5.EnqueueMessage:發送MSG_SYNC_END消息 其中包含sync_key為最後一條msg的MsgID 服務端->客戶端:MSG_SYNC_END 客戶端: 1.取出newSyncKey(如果newSyncKey>this.syncKey,客戶端保存newSyncKey,併發送給服務端MSG_SYNC_KEY) 2.切換同步狀態isSyncing = false; 3.如果this.pendingSyncKey > this.syncKey , 即在上次同步中有新的MSG_SYNC_NOTIFY消息傳給客戶端,則再次同步,sendSync發送syncKey,pendingSyncKey置零 客戶端->服務端: MSG_SYNC_KEY 服務端: 1.從sync_key得到last_id, 2.包裹成SyncHistory,寫入管道sync_c <- s

9.超級群同步流程
服務端->客戶端 MSG_SYNC_GROUP_NOTIFY 客戶端: 1.isSyncing==false:sendSync 發送舊syncKey,請求消息MSG_SYNC_GROUP,狀態切換為同步狀態 2.isSyncing==true: 同步狀態中,新的newSyncKey 保存在pendingSyncKey中,this.pendingSyncKey = newSyncKey; 客戶端->服務端 MSG_SYNC_GROUP 服務端: 1/從客戶端傳來的group_sync_key取出group_id,sync_key,last_id=sync_key,(如果last_id,從redis取出新的group_sync_key_$groupid) 2.rpc->SyncGroupMessage:根據last_id取出最近的群消息 msgs 3.EnqueueMessage:發送MSG_SYNC_GROUP_BEGIN 消息 4.EnqueueMessage:迴圈發送msgs 5.EnqueueMessage:發送MSG_SYNC_GROUP_END,其中sync_key為最後一條msg的MsgID 服務端->客戶端 MSG_SYNC_GROUP_END 客戶端: 1.取出GroupSyncKey.syncKey和當前syncKey(如果GroupSyncKey.syncKey較大,客戶端保存更新,併發送給服務端 MSG_GROUP_SYNC_KEY) 2.切換同步狀態isSyncing = false; 如果this.pendingSyncKey > this.syncKey , 即在上次同步中有新的MSG_SYNC_NOTIFY消息傳給客戶端,則再次同步,sendSync發送syncKey,pendingSyncKey置零 客戶端->服務端 MSG_GROUP_SYNC_KEY 服務端: 1.取出group_id last_id 2.包裹成SyncGroupHistory,寫入管道group_sync_c <- s
IMR模塊
1.redis_pool
2.group_manager
3.ListenClient :4444
1.MSG_SUBSCRIBE 2.MSG_UNSUBSCRIBE 3.MSG_SUBSCRIBE_ROOM 4.MSG_UNSUBSCRIBE_ROOM (以上四個均是對imr維護的路由表增刪改查) 5.MSG_PUBLISH 1.根據消息內容得到消息類型和目標用戶 2.查詢路由表,用戶如果離線則將消息放入第三方推送隊列 3.根據路由表得到用戶連接所在im節點,並把消息推送至該節點 第三部過濾條件: 1.消息類型不為 MSG_IM、MSG_GROUP_IM、MSG_CUSTOMER、MSG_CUSTOMER_SUPPORT、MSG_SYSTEM 2.目標IM節點和發送IM節點不是同一節點 6.MSG_PUBLISH_GROUP 1.根據消息內容得到消息類型和群 2.對群內每個成員查詢路由表,用戶如果離線則將消息放入第三方推送隊列 3.群發給所有IM節點 第三部過濾條件: 1.消息類型為 MSG_PUBLISH_GROUP 7.MSG_PUBLISH_ROOM 1.根據消息內容得到roomid 2.根據roomid查詢路由表得到所有節點 3.對每個節點發送消息 第三部過濾條件: 1.發送節點和目標節點不是同一節點

IMS模塊 (主從)
1.NewMaster
1.init:創建容器存儲 clients,創建隊列ewt 2.run:監聽隊列ewt,將ewt隊列內消息添加到緩存cache數組 cache每滿1000或者每隔1分鐘,執行SendBatch, SendBatch: 封裝消息 MSG_STORAGE_SYNC_MESSAGE_BATCH ,寫入每個從節點連接的消息隊列client.ewt
2.NewSlaver 監聽主節點(可選)
run:連接至主節點,連接成功發送 MSG_STORAGE_SYNC_BEGIN
迴圈讀取消息:
MSG_STORAGE_SYNC_MESSAGE storage.SaveSyncMessage(emsg)
MSG_STORAGE_SYNC_MESSAGE_BATCH storage.SaveSyncMessageBatch(mb)
3.waitSignal 處理中斷 SIGINT SIGTERM
storage.FlushPeerIndex() 將每個人最近消息的msgid寫入文件
storage.FlushGroupIndex()
4.ListenSyncClient master監聽 3334
處理從節點連接 RunLoop: 1.(初始化同步)接受 MSG_STORAGE_SYNC_BEGIN 消息,從中取得msgid, 根據msgid LoadSyncMessagesInBackground 查詢得到消息,併發送給從節點 2.將從節點連接添加至 clients 3.進入for迴圈,監聽消息隊列client.ewt併發送給從節點 4.迴圈break後 RemoveClient
5.ListenRPCClient :13333
SyncMessage
SyncGroupMessage
SavePeerMessage
SaveGroupMessage
GetNewCount
由im調用
6.ListenClient() 3333
對於每個連接: 1.init:創建寫管道wt 2.run :寫線程,從wt管道取數據併發送 讀線程,HandleMessage
7.HandleMessage
1.MSG_LOAD_OFFLINE 對IM: storage_client.LoadOfflineMessage響應 2.MSG_SAVE_AND_ENQUEUE 對IM:storage_client.SaveAndEnqueueMessage響應 3.MSG_DEQUEUE 對IM: storage_client.DequeueMessage響應 *4.MSG_LOAD_LATEST 對IM-StartHttpServer-LoadLatestMessage響應 *5.MSG_LOAD_HISTORY 對IM-StartHttpServer-LoadHistoryMessage響應 6.MSG_SAVE_AND_ENQUEUE_GROUP 對IM: storage_client.SaveAndEnqueueGroupMessage響應 7.MSG_DEQUEUE_GROUP 對IM :storage_client.DequeueGroupMessage響應 8.MSG_LOAD_GROUP_OFFLINE 對IM: storage_client.LoadGroupOfflineMessage響應 (上述除了4,5,暫無被IM模塊調用)

作者:JackieF777
鏈接:https://www.jianshu.com/p/8121d6e85282
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯繫作者獲得授權並註明出處。