源碼地址 https://gitee.com/bin-0821/chat-room-demo-go-websocket 關於websocket,上一篇文章講述瞭如何通過websocket進行服務端與客戶端的通信,本篇將會帶領大家把各個websocket進行相互通信,在開始之前,請確保有理解 1 go ...
源碼地址
https://gitee.com/bin-0821/chat-room-demo-go-websocket
關於websocket,上一篇文章講述瞭如何通過websocket進行服務端與客戶端的通信,本篇將會帶領大家把各個websocket進行相互通信,在開始之前,請確保有理解
1 go的通道
2 go的線程
3 gin基礎
事實上,websocket與websocket之間是無法進行直接相互通信的,需要我們將數據接收後,發送給另一個websocket鏈接,可以理解為
conn1.ReadJson(&data)
conn2.WriteJson(data)
而建立一個類似微信聊天一樣的,能進行多群聊,一對多,一對一的聊天,需要對websocket進行管理,本篇文章的重點便是如何管理好所有用戶的websocket連接,主要有以下方面
1,一個根據業務進行設計的數據結構
2,用戶上下線後conn的處理
3,用戶發送信息的群發或單發
首先,要搞清楚我們在做什麼,聊天室要實現的功能類似微信
a,b,c三人,1,2,3 三個聊天室
人員 加入的聊天室
a 1,2,3
b 1,2
c 1
a在1發送信息,全部人都能收到
a在2發送信息,c收不到,以此類推
a可以與c單獨發送信息,接收方不線上時,系統能正常運行
1. 目錄結構
相比上篇文章 多了manage_socket_conn,service兩個模塊,重點在於多了manage_socket_conn模塊中
D:.
│ go.mod
│ go.sum
│ main.go
│ msg.json
├─api
│ socket_conn.go
├─manage_socket_conn //用戶的websocket管理模塊
│ char_room_thread.go //線程 主要負責對信息的群發
│ room_set.go //聊天室房間管理,房間的創建,銷毀 存儲房間內的用戶id
│ user_set.go //用戶websocket鏈接管理,信息的發送,存儲所有線上的webscoket鏈接,用戶上下線
├─middleware
│ Cros.go
├─model
│ socket_msg_form_front.go
│ to_front.go
├─route
│ route.go
└─service
chat_room.go //數據層,模擬用戶加入了那些聊天室
2. 代碼內容
user_set.go
package manage_socket_conn
import (
"WebSocketDemo/model"
"errors"
"fmt"
"github.com/gorilla/websocket"
"sync"
)
func init() {
GetUserSet()
}
//用戶map 用來存儲每個線上的用戶id與對應的conn
type userSet struct {
// 用戶鏈接集 用戶id => 鏈接對象
users map[int]*websocket.Conn
lock sync.Mutex
once sync.Once
}
var us = new(userSet)
// 單例模式
func GetUserSet() *userSet {
us.once.Do(func() {
us.users = make(map[int]*websocket.Conn)
us.users[-1] = nil
us.lock = sync.Mutex{}
})
return us
}
// 用戶創建發起websocket連接
// join_type 加入模式
// 1 正常加入 占線無法加入
// 2 強制加入 即踢下線前者
func (u *userSet) ConnConnect(user_id, join_type int, conn *websocket.Conn) (int, error) {
u.lock.Lock()
defer u.lock.Unlock()
if join_type == 1 {
// 用戶id是否已經線上
if _, ok := u.users[user_id]; ok {
return 1, errors.New("該賬號已被登陸")
}
} else if join_type == 2 {
// 如果原用戶id 已經存在map內 進行銷毀擠出
if conn2, ok := u.users[user_id]; ok {
err := conn2.Close()
if err != nil {
fmt.Println(err)
}
delete(u.users, user_id)
}
// 重新加入
u.users[user_id] = conn
}
return -1, nil
}
// 鏈接斷開
func (u *userSet) ConnDisconnect(user_id int, conn *websocket.Conn) error {
u.lock.Lock()
defer u.lock.Unlock()
if conn2, ok := u.users[user_id]; ok {
if conn == conn2 {
delete(u.users, user_id)
}
} else {
// Log 不存在的鏈接申請斷開
}
return nil
}
// 對單個鏈接發送信息
func (u *userSet) SendMsgByUid(user_id int, msg interface{}) error {
var err error
if conn, ok := u.users[user_id]; ok {
err = conn.WriteJSON(msg)
} else {
err = errors.New("不存在的鏈接")
}
return err
}
// 對多個連接發送信息
func (u *userSet) SendMsgByUidList(user_id_list []int, msg interface{}) (id_list []int, err_list []error) {
for _, user_id := range user_id_list {
// 這裡判斷用戶是否自己,是自己就跳過
c := msg.(model.ChatMsg)
if c.ChatMsgType == 1 {
if (c.Data["form_user_id"].(int)) == user_id {
continue
}
}
if conn, ok := u.users[user_id]; ok {
err := conn.WriteJSON(msg)
if err != nil {
id_list = append(id_list, user_id)
err_list = append(err_list, err)
}
} else {
id_list = append(id_list, user_id)
err_list = append(err_list, errors.New("不存在的鏈接"))
}
}
return
}
room_set.go
package manage_socket_conn
import (
"sync"
)
//群map 用來存儲每個群線上的用戶id
type roomSet struct {
// 群id 群內的用戶id
rooms map[int]map[int]struct{}
lock sync.Mutex
once sync.Once
}
var rs = new(roomSet)
// 單例
func GetRoomSet() *roomSet{
rs.once.Do(func() {
rs.rooms = make(map[int]map[int]struct{})
rs.lock = sync.Mutex{}
})
return rs
}
// 向用戶發送
func (r *roomSet)SendMsgToUserList (r_id int ,msg interface{}){
userS := GetUserSet()
r.lock.Lock()
defer r.lock.Unlock()
var user_id_list []int
for key, _ := range r.rooms[r_id] {
user_id_list = append(user_id_list, key)
}
userS.SendMsgByUidList(user_id_list,msg)
}
// 用戶下線/退群 退出聊天室鏈接集合
func (r *roomSet) UserQuitRooms(room_id_list []int ,user_id int) {
r.lock.Lock()
defer r.lock.Unlock()
for _, room_id := range room_id_list {
if v ,ok := r.rooms[room_id];ok {
delete(v,user_id)
// 房間沒人就銷毀
if len(r.rooms[room_id]) <= 0 {
delete(r.rooms, room_id)
}
}
}
return
}
// 用戶上線/入群 加入聊天室連接集合
func (r *roomSet)UserJoinRooms(room_id_list []int,user_id int) {
r.lock.Lock()
defer r.lock.Unlock()
for _, room_id := range room_id_list {
if v,ok := r.rooms[room_id];!ok {
// 房間不存在就創建
r.rooms[room_id] = make(map[int]struct{})
r.rooms[room_id][user_id] = struct{}{}
}else {
v[user_id] = struct{}{}
}
}
return
}
// 用戶下線/退群 退出聊天室鏈接集合
func (r *roomSet) UserQuitRoom(room_id ,user_id int) {
r.lock.Lock()
defer r.lock.Unlock()
if v ,ok := r.rooms[room_id];ok {
delete(v,user_id)
// 房間沒人就銷毀
if len(r.rooms[room_id]) <= 0 {
delete(r.rooms, room_id)
}
}
return
}
// 用戶上線/入群 加入聊天室連接集合
func (r *roomSet)UserJoinRoom(room_id,user_id int) {
r.lock.Lock()
defer r.lock.Unlock()
if v,ok := r.rooms[room_id];!ok {
// 房間不存在就創建
r.rooms[room_id] = make(map[int]struct{})
r.rooms[room_id][user_id] = struct{}{}
}else {
v[user_id] = struct{}{}
}
return
}
char_room_thread.go
package manage_socket_conn
import (
"WebSocketDemo/model"
"fmt"
"sync"
)
var cRoomThread = new(charRoomThread)
type charRoomThread struct {
msgChannel chan model.ConnMsg
lock sync.Mutex
once sync.Once
}
// 向通道發送數據
func (c *charRoomThread)SendMsg(msg model.ConnMsg){
fmt.Println(msg)
c.msgChannel <- msg
}
// 單例
func GetCharRoomThread() *charRoomThread {
cRoomThread.once.Do(func() {
cRoomThread.msgChannel = make(chan model.ConnMsg,30)
cRoomThread.lock = sync.Mutex{}
})
return cRoomThread
}
// 啟動通道監聽
// ChatMsgType 1 群聊信息 2 一對一信息
func (c *charRoomThread) Start() {
for {
select {
case msg := <-c.msgChannel:
if msg.Msg.ChatMsgType == 1 {
// 標明發送方用戶id
msg.Msg.Data["form_user_id"] = msg.FormUserID
// 在這裡你可以將聊天信息入庫等等操作
// do something
// 發送信息
// 註意 msg.Msg.Data["room_id"].(int) 這種寫法在data為nil時 運行時會 panic 導致整個系統停掉
// 所以在上一層最好對數據內容進行判斷,再把值發送到通道內
GetRoomSet().SendMsgToUserList(int(msg.Msg.Data["room_id"].(float64)),msg.Msg)
}else if msg.Msg.ChatMsgType == 2 {
msg.Msg.Data["form_user_id"] = msg.FormUserID
// 如果發送不成功 說明接收方不線上
_ = GetUserSet().SendMsgByUid(int(msg.Msg.Data["to_user_id"].(float64)),msg.Msg)
}
}
}
}
main.go做以下修改
package main
import (
"WebSocketDemo/route"
Mg"WebSocketDemo/manage_socket_conn"
)
func main() {
ro := route.GetRoute()
go Mg.GetCharRoomThread().Start()
_ = ro.Run("0.0.0.0:8083")
}
api層 socket_conn.go 內容
package api
import (
Mg "WebSocketDemo/manage_socket_conn"
"WebSocketDemo/model"
Service "WebSocketDemo/service"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"net/http"
"strconv"
)
// websocket配置
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: checkOrigin,
}
func checkOrigin(r *http.Request) bool {
return true
}
var (
ServiceChatRoom Service.ChatRoom
)
// 用戶申請創建socket鏈接
func ConCreateConn(ctx *gin.Context) {
var (
conn *websocket.Conn
err error
user_id int
)
// 獲取user_id 這裡可以是token,經過中間件解析後的存在 ctx 的user_id
// 為方便演示 這裡直接請求頭帶user_id,正常開發不建議
user_id, err = strconv.Atoi(ctx.GetHeader("user_id"))
if err != nil && user_id <= 0 {
ctx.JSON(200, model.ResDatas(500, "請求必須帶user_id"+err.Error(), nil))
return
}
//fmt.Println("user_id", user_id)
// 判斷請求過來的鏈接是否要升級為websocket
if websocket.IsWebSocketUpgrade(ctx.Request) {
// 將請求升級為 websocket鏈接
conn, err = upgrader.Upgrade(ctx.Writer, ctx.Request, ctx.Writer.Header())
if err != nil {
ctx.JSON(200, model.ResDatas(500, "創建鏈接失敗"+err.Error(), nil))
return
}
}else {
return
}
// 獲取用戶加入的聊天室id數組
room_ids, _ := ServiceChatRoom.GetUserRoomIds(user_id)
// 用戶加入房間集
Mg.GetRoomSet().UserJoinRooms(room_ids,user_id)
// 用戶加入鏈接集
_,_ = Mg.GetUserSet().ConnConnect(user_id,2,conn)
// 用戶斷開銷毀
defer func() {
_ = conn.Close()
// 用戶斷開時也要銷毀在聊天集內的對象
_ = Mg.GetUserSet().ConnDisconnect(user_id,conn)
}()
for {
var msg model.ConnMsg
// ReadJSON 獲取值的方式類似於gin的 ctx.ShouldBind() 通過結構體的json映射值
// 如果讀不到值 則堵塞在此處
err = conn.ReadJSON(&msg)
if err != nil {
// 寫回錯誤信息
err = conn.WriteJSON(model.ResDatas(400, "獲取數據錯誤:"+err.Error(), nil))
if err != nil {
fmt.Println("用戶斷開")
return
}
}
// do something.....
msg.FormUserID = user_id
// 發送回信息
//err = conn.WriteJSON(msg)
if err != nil {
fmt.Println("用戶斷開")
return
}
if err = valMsg(msg);err != nil{
_ = conn.WriteJSON(model.ResDatas(400, "數據不合法:"+err.Error(), nil))
continue
}
// 將數據發送進通道
Mg.GetCharRoomThread().SendMsg(msg)
}
}
// 驗證數據 例如用戶是否有加入聊天室
func valMsg(msg model.ConnMsg) error {
// do something...
return nil
}
關於代碼的解釋在註釋里已經寫的非常清楚了,
主要是兩個結構體
//群map 用來存儲每個群線上的用戶id
type roomSet struct {
// 群id 群內的用戶id
rooms map[int]map[int]struct{}
lock sync.Mutex
once sync.Once
}
//用戶map 用來存儲每個線上的用戶id與對應的conn
type userSet struct {
// 用戶鏈接集 用戶id => 鏈接對象
users map[int]*websocket.Conn
lock sync.Mutex
once sync.Once
}
可能有人會問,roomSet.rooms[房間id]map[用戶id]struct{}
這裡的用戶集為什麼是map類型,而不是[]int類型
答:
想一下,當用戶下線或退出群聊時,怎麼在[]int內進行刪除該用戶的id,註意:此時的[]int是無序的,而加入群時,又要防止id重覆,所以實現起來過於麻煩,倒不如使用map,go底層為你封裝好的值判斷,使用起來會更方便,這裡的struct{}是沒有意義的,僅作為占位。
當我要把信息發送給群里的所有用戶時,先從roomSet根據房間id拿到用戶id map,將key轉化為[]int,調用userSet的 SendMsgByUidList()方法這樣就完成了信息的群發
而一對一的單發就不再重覆說了,跳過roomSet,直接發送
其他內容已經在代碼註釋里講得非常詳細了
3. 具體流程
// 1 main方法內 啟動一條線程 監聽從api層的socket_conn ConCreateConn()用戶寫入的發送的值
go Mg.GetCharRoomThread().Start()
// 2 ConCreateConn()方法內
// 獲取用戶加入的聊天室id數組
room_ids, _ := ServiceChatRoom.GetUserRoomIds(user_id)
// 用戶加入房間集
Mg.GetRoomSet().UserJoinRooms(room_ids,user_id)
// 用戶加入鏈接集
_,_ = Mg.GetUserSet().ConnConnect(user_id,2,conn)
defer func() {
_ = conn.Close()
// 用戶斷開時也要銷毀在聊天集內的對象
_ = Mg.GetUserSet().ConnDisconnect(user_id,conn)
}()
//3 用戶發送數據
Mg.GetCharRoomThread().SendMsg(msg)
//啟動的線程將會接收到通道的數據 char_room_thread
Mg.GetCharRoomThread().Start()
// 判斷數據發送類型進行發送
// ChatMsgType 1 群聊信息 2 一對一信息
if msg.Msg.ChatMsgType == 1 {
// 標明發送方用戶id
msg.Msg.Data["form_user_id"] = msg.FormUserID
// 在這裡你可以將聊天信息入庫等等操作
// do something
// 發送信息
// 註意 msg.Msg.Data["room_id"].(int) 這種寫法在data為nil時 運行時會 panic 導致整個系統停掉
// 所以在上一層最好對數據內容進行判斷,再把值發送到通道內
GetRoomSet().SendMsgToUserList(int(msg.Msg.Data["room_id"].(float64)),msg.Msg)
}else if msg.Msg.ChatMsgType == 2 {
msg.Msg.Data["form_user_id"] = msg.FormUserID
// 如果發送不成功 說明接收方不線上
_ = GetUserSet().SendMsgByUid(int(msg.Msg.Data["to_user_id"].(float64)),msg.Msg)
}
4. 啟動項目並測試
使用apipost進行websocket測試
用戶加入的群在servic層已經模擬了
func (ChatRoom) GetUserRoomIds(user_id int) (r_ids []int,err error) {
if user_id == 1 {
r_ids = []int{1,2,3}
}else if user_id == 2 {
r_ids = []int{1,2}
}else if user_id == 3 {
r_ids = []int{1}
}
return
}
user_id與介面名稱一樣,然後將所有websocket進行連接
發送json數據
{
"msg": {
"chat_msg_type": 1,
"data": {
"room_id": 2,
"content": "我是用戶1發送的信息,房間2,只有用戶3,不在房間"
}
}
}
房間2 只有用戶3不存在房間里所以用戶3接收不到信息
用戶2 發送信息
{
"msg": {
"chat_msg_type": 1,
"data": {
"room_id": 1,
"content": "我是用戶2發送的信息,房間1,所有人都可以接收"
}
}
}
可以看到所有人都可以接收信息
一對一測試
用戶1發送json給用戶3
{
"msg": {
"chat_msg_type": 2,
"data": {
"to_user_id": 3,
"content": "不要讓用戶2看到"
}
}
}
可以看到只有用戶2無法收到一對一的信息
5. 總結
關於websocket系列教程就已經結束了
本章重點在於如何進行設計一個websocket管理模塊,對線上的用戶進行管理
不足點:
由於怕篇幅過長,沒有將聊天數據存儲起來,實現原理便是在發送信息前把數據存入庫,搭配gorm的事務,當有錯誤時便回滾,
用戶上線時,前端獲取本地存儲的聊天數據id,拉取最後的數據列表,便可做到用戶上線讀取未接收的數據,這裡可以在api層直接實現
程式的所有問題,大部分都可以通過創造性的思想進行解決,希望本篇內容能對你有所幫助
歡迎大家點贊轉發
本文來自博客園,作者:樹杉,轉載請註明原文鏈接:https://www.cnblogs.com/xushushan/p/16492439.html
選擇了程式員這條路,那就要學習一輩子