Go創建Grpc鏈接池 轉載請註明來源:https://janrs.com/2023/03/%e5%88%9b%e5%bb%bagrpc%e9%93%be%e6%8e%a5%e6%b1%a0/ 常規用法 gRPC 四種基本使用 請求響應模式 客戶端數據流模式 服務端數據流模式 雙向流模式 常見的gR ...
Go創建Grpc鏈接池
常規用法
gRPC
四種基本使用
- 請求響應模式
- 客戶端數據流模式
- 服務端數據流模式
- 雙向流模式
常見的gRPC
調用寫法
func main(){
//... some code
// 鏈接grpc服務
conn , err := grpc.Dial(":8000",grpc.WithInsecure)
if err != nil {
//...log
}
defer conn.Close()
//...some code
存在的問題:面臨高併發的情況,性能問題很容易就會出現,例如我們在做性能測試的時候,就會發現,打一會性能測試,客戶端請求服務端的時候就會報錯:
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused
實際去查看問題的時候,很明顯,這是 gRPC 的連接數被打滿了,很多連接都還未完全釋放。[#本文來源:janrs.com#]
gRPC
的通信本質上也是 TCP
的連接,那麼一次連接就需要三次握手,和四次揮手,每一次建立連接和釋放連接的時候,都需要走這麼一個過程,如果我們頻繁的建立和釋放連接,這對於資源和性能其實都是一個大大的浪費。
在服務端,gRPC
服務端的鏈接管理不用我們操心,但是 gRPC
客戶端的鏈接管理非常有必要關心,要實現復用客戶端的連接。
創建鏈接池
創建鏈接池需要考慮的問題:
- 連接池是否支持擴縮容
- 空閑的連接是否支持超時自行關閉,是否支持保活
- 池子滿的時候,處理的策略是什麼樣的
創建鏈接池介面
type Pool interface {
// 獲取一個新的連接 , 當關閉連接的時候,會將該連接放入到池子中
Get() (Conn, error)
// 關閉連接池,自然連接池子中的連接也不再可用
Close() error
//[#本文來源:janrs.com#]
Status() string
}
實現鏈接池介面
創建鏈接池代碼
func New(address string, option Options) (Pool, error) {
if address == "" {
return nil, errors.New("invalid address settings")
}
if option.Dial == nil {
return nil, errors.New("invalid dial settings")
}
if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {
return nil, errors.New("invalid maximum settings")
}
if option.MaxConcurrentStreams <= 0 {
return nil, errors.New("invalid maximun settings")
}
p := &pool{
index: 0,
current: int32(option.MaxIdle),
ref: 0,
opt: option,
conns: make([]*conn, option.MaxActive),
address: address,
closed: 0,
}
for i := 0; i < p.opt.MaxIdle; i++ {
c, err := p.opt.Dial(address)
if err != nil {
p.Close()
return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)
}
p.conns[i] = p.wrapConn(c, false)
}
log.Printf("new pool success: %v\n", p.Status())
return p, nil
}
關於以上的代碼,需要特別註意每一個連接的建立也是在 New
裡面完成的,[#本文來源:janrs.com#]只要有 1
個連接未建立成功,那麼咱們的連接池就算是建立失敗,咱們會調用 p.Close()
將之前建立好的連接全部釋放掉。
關閉鏈接池代碼
// 關閉連接池
func (p *pool) Close() error {
atomic.StoreInt32(&p.closed, 1)
atomic.StoreUint32(&p.index, 0)
atomic.StoreInt32(&p.current, 0)
atomic.StoreInt32(&p.ref, 0)
p.deleteFrom(0)
log.Printf("[janrs.com]close pool success: %v\n", p.Status())
return nil
}
從具體位置刪除鏈接池代碼
// 清除從 指定位置開始到 MaxActive 之間的連接
func (p *pool) deleteFrom(begin int) {
for i := begin; i < p.opt.MaxActive; i++ {
p.reset(i)
}
}
銷毀具體的鏈接代碼
// 清除具體的連接
func (p *pool) reset(index int) {
conn := p.conns[index]
if conn == nil {
return
}
conn.reset()
p.conns[index] = nil
}
關閉鏈接
代碼
func (c *conn) reset() error {
cc := c.cc
c.cc = nil
c.once = false
// 本文博客來源:janrs.com
if cc != nil {
return cc.Close()
}
return nil
}
func (c *conn) Close() error {
c.pool.decrRef()
if c.once {
return c.reset()
}
return nil
}
在使用連接池通過 pool.Get()
拿到具體的連接句柄 conn
之後,會使用 conn.Close()
關閉連接,實際上也是會走到上述的 Close()
實現的位置,但是並未指定當然也沒有許可權顯示的指定將 once
置位為 false
,也就是對於調用者來說,是關閉了連接,對於連接池來說,實際上是將連接歸還到連接池中。
擴縮容
關鍵代碼
func (p *pool) Get() (Conn, error) {
// the first selected from the created connections
nextRef := p.incrRef()
p.RLock()
current := atomic.LoadInt32(&p.current)
p.RUnlock()
if current == 0 {
return nil, ErrClosed
}
if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}
// 本文博客來源:janrs.com
// the number connection of pool is reach to max active
if current == int32(p.opt.MaxActive) {
// the second if reuse is true, select from pool's connections
if p.opt.Reuse {
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}
// the third create one-time connection
c, err := p.opt.Dial(p.address)
return p.wrapConn(c, true), err
}
// the fourth create new connections given back to pool
p.Lock()
current = atomic.LoadInt32(&p.current)
if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {
// 2 times the incremental or the remain incremental ##janrs.com
increment := current
if current+increment > int32(p.opt.MaxActive) {
increment = int32(p.opt.MaxActive) - current
}
var i int32
var err error
for i = 0; i < increment; i++ {
c, er := p.opt.Dial(p.address)
if er != nil {
err = er
break
}
p.reset(int(current + i))
p.conns[current+i] = p.wrapConn(c, false)
}
// 本文博客來源:janrs.com
current += i
log.Printf("#janrs.com#grow pool: %d ---> %d, increment: %d, maxActive: %d\n",
p.current, current, increment, p.opt.MaxActive)
atomic.StoreInt32(&p.current, current)
if err != nil {
p.Unlock()
return nil, err
}
}
p.Unlock()
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}
Get
代碼邏輯
- 先增加連接的引用計數,如果在設定
current*int32(p.opt.MaxConcurrentStreams)
範圍內,那麼直接取連接進行使用即可。 - 若當前的連接數達到了最大活躍的連接數,那麼就看我們新建池子的時候傳遞的
option
中的reuse
參數是否是true
,若是復用,則隨機取出連接池中的任意連接提供使用,如果不復用,則新建一個連接。 - 其餘的情況,就需要我們進行
2
倍或者1
倍的數量對連接池進行擴容了。
也可以在 Get
的實現上進行縮容,具體的縮容策略可以根據實際情況來定,例如當引用計數 nextRef
只有當前活躍連接數的 10%
的時候(這隻是一個例子),就可以考慮縮容了。
性能測試
有關鏈接池的創建以及性能測試