拉模式和推模式 拉模式 1、數據更新頻率低,則大多數請求是無效的 2、線上用戶量多,則服務端的查詢負載高 3、定時輪詢拉取,實時性低 推模式 1、僅在數據更新時才需要推送 2、需要維護大量的線上長連接 3、數據更新後可以立即推送 基於webSocket推送 1、瀏覽器支持的socket編程,輕鬆維持 ...
拉模式和推模式
拉模式
1、數據更新頻率低,則大多數請求是無效的
2、線上用戶量多,則服務端的查詢負載高
3、定時輪詢拉取,實時性低
推模式
1、僅在數據更新時才需要推送
2、需要維護大量的線上長連接
3、數據更新後可以立即推送
基於webSocket推送
1、瀏覽器支持的socket編程,輕鬆維持服務端長連接
2、基於TCP可靠傳輸之上的協議,無需開發者關心通訊細節
3、提供了高度抽象的編程介面,業務開發成本較低
webSocket協議與交互
通訊流程
客戶端->upgrade->服務端
客戶端<-switching<-服務端
客戶端->message->服務端
客戶端<-message<-服務端
實現http服務端
1、webSocket是http協議upgrade而來
2、使用http標準庫快速實現空介面:/ws
webSocket握手
1、使用webSocket.Upgrader完成協議握手,得到webSocket長連接
2、操作webSocket api,讀取客戶端消息,然後原樣發送回去
封裝webSocket
缺乏工程化設計
1、其他代碼模塊,無法直接操作webSocket連接
2、webSocket連接非線程安全,併發讀/寫需要同步手段
隱藏細節,封裝api
1、封裝Connection結構,隱藏webSocket底層連接
2、封裝Connection的api,提供Send/Read/Close等線程安全介面
api原理(channel是線程安全的)
1、SendMessage將消息投遞到out channel
2、ReadMessage從in channel讀取消息
內部原理
1、啟動讀協程,迴圈讀取webSocket,將消息投遞到in channel
2、啟動寫協程,迴圈讀取out channel,將消息寫給webSocket
// server.go
package main
import (
"net/http"
"github.com/gorilla/websocket"
"./impl"
"time"
)
var (
upgrader = websocket.Upgrader{
//允許跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
func wsHandler(w http.ResponseWriter, r *http.Request) {
var (
wsConn *websocket.Conn
err error
conn *impl.Connection
data []byte
)
//Upgrade:websocket
if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
return
}
if conn, err = impl.InitConnection(wsConn); err != nil {
goto ERR
}
go func() {
var (
err error
)
for {
if err =conn.WriteMessage([]byte("heartbeat")); err != nil {
return
}
time.Sleep(1 * time.Second)
}
}()
for {
if data, err = conn.ReadMessage(); err != nil {
goto ERR
}
if err = conn.WriteMessage(data); err != nil {
goto ERR
}
}
ERR:
//關閉連接
conn.Close()
}
func main() {
//http:localhost:7777/ws
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe("0.0.0.0:7777", nil)
}
// connection.go
package impl
import (
"github.com/gorilla/websocket"
"sync"
"github.com/influxdata/platform/kit/errors"
)
var once sync.Once
type Connection struct {
wsConn *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
isClosed bool
mutex sync.Mutex
}
func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
conn = &Connection{
wsConn:wsConn,
inChan:make(chan []byte, 1000),
outChan:make(chan []byte, 1000),
closeChan:make(chan byte, 1),
}
//啟動讀協程
go conn.readLoop()
//啟動寫協程
go conn.writeLoop()
return
}
//API
func (conn *Connection) ReadMessage() (data []byte, err error) {
select {
case data = <- conn.inChan:
case <- conn.closeChan:
err = errors.New("connection is closed")
}
return
}
func (conn *Connection) WriteMessage(data []byte) (err error) {
select {
case conn.outChan <- data:
case <- conn.closeChan:
err = errors.New("connection is closed")
}
return
}
func (conn *Connection) Close() {
// 線程安全的close,可重入
conn.wsConn.Close()
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed = true
}
conn.mutex.Unlock()
}
//內部實現
func (conn *Connection) readLoop() {
var (
data []byte
err error
)
for {
if _, data, err = conn.wsConn.ReadMessage(); err != nil {
goto ERR
}
//阻塞在這裡,等待inChan有空位置
//但是如果writeLoop連接關閉了,這邊無法得知
//conn.inChan <- data
select {
case conn.inChan <- data:
case <-conn.closeChan:
//closeChan關閉的時候,會進入此分支
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop() {
var (
data []byte
err error
)
for {
select {
case data = <- conn.outChan:
case <- conn.closeChan:
goto ERR
}
if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
goto ERR
}
conn.outChan <- data
}
ERR:
conn.Close()
}