hello,大家好呀,我是小樓。 上篇文章《一言不合就重構》 說了我最近重構的一個系統,雖然重構完了,但還在灰度,這不,在灰度過程中又發現了一個問題。 背景 這個問題簡單說一下背景,如果不明白可以看上篇文章 ,不想看也沒關係,這是個通用的解法,後面我會總結抽象下。 在上篇文章的最後提到對每個摘除的地 ...
hello,大家好呀,我是小樓。
上篇文章《一言不合就重構》 說了我最近重構的一個系統,雖然重構完了,但還在灰度,這不,在灰度過程中又發現了一個問題。
背景
這個問題簡單說一下背景,如果不明白可以看上篇文章 ,不想看也沒關係,這是個通用的解法,後面我會總結抽象下。
在上篇文章的最後提到對每個摘除的地址做決策時,需要順序執行,且每一個要摘除的地址都要實時獲取該集群的地址信息,以便做出是否需要兜底的決策。
當被摘除的機器非常多時,獲取地址信息的請求量就會非常大,對註冊中心造成了不小的壓力。
請求數據源的介面如下所示(其中 cuuid 是集群的 id)
type Read interface {
ListClusterEndpoints(ctx context.Context, cuuid string) ([]ptypes.Endpoint, error)
}
相信大家也能理解這個非常簡單的背景並且能想到一些解法。每次決策需要按 cuuid 獲取集群,也就是單個單個地獲取實時集群地址信息,由於是實時信息,緩存首先排除,其次自然而然地能想到如果能將請求合併一下,是不是就能解決請求量大的問題?
難點
如果只是改邏輯合併一下請求,吭哧吭哧改代碼就完了,也不值得寫這篇文章了,如何改最少的代碼來實現合併請求才是最難的。
解法
那天遇到這個問題,晚上輾轉反側想到了這個解法,其實主要也是參考 Go http client 的實現,都說看源碼沒用,這不就是用處麽?
Read
數據源介面定義保持不變,也就是上層的業務代碼完全不用改,只需要把 ListClusterEndpoints
的實現換掉。
我們可以用一個隊列把每個請求入隊,入隊列以後,調用方阻塞,然後起一些協程去隊列里取一批請求參數,發起批量請求,響應之後喚醒阻塞的調用方。
為此,我們實現一個可以阻塞並被其他協程喚醒的工具:
type token struct {
value interface{}
err error
}
type Token chan token
func NewToken() Token {
return make(Token, 1)
}
func (t Token) Done(value interface{}, err error) {
t <- token{value: value, err: err}
}
func (t Token) Wait(timeout time.Duration) (value interface{}, err error) {
if timeout <= 0 {
tk := <-t
return tk.value, tk.err
}
select {
case tk := <-t:
return tk.value, tk.err
case <-time.After(timeout):
return nil, ErrTokenTimeout
}
}
其次,定義隊列和其他參數:
type DataSource struct {
paramCh chan param
readTimeout time.Duration
concurrency int
step int
}
type param struct {
cuuid string
token Token
}
替換掉原來 ListClusterEndpoints
的實現:
func (p *DataSource) ListClusterEndpoints(ctx context.Context, cuuid string) ([]ptypes.Endpoint, error) {
req := param{
cuuid: cuuid,
token: NewToken(),
}
select {
case p.paramCh <- req:
default:
return nil, fmt.Errorf("list cluster endpoints write channel failed")
}
value, err := req.token.Wait(p.readTimeout)
if err != nil {
return nil, err
}
eps, ok := value.([]ptypes.Endpoint)
if !ok {
return nil, fmt.Errorf("value is not endpoints")
}
return endpoints, nil
}
再起幾個協程來處理任務:
func (p *DataSource) startListClusterEndpointsLoop() {
for i := 0; i < p.concurrency; i++ {
go func() {
for {
reqs := p.getListClusterEndpointsReqFromChan()
p.doBatchListClusterEndpoints(reqs)
}
}()
}
}
最關鍵的是 getListClusterEndpointsReqFromChan
的實現,既不能讓協程空跑,這樣太消耗cpu,又要能及時地取到一批參數,我們採取的方法是先阻塞地獲取一個參數,如果沒數據則阻塞,如果有數據,繼續取,直到數量達到上限或者取不到數據為止,此時這一批數據就可以批量地進行調用了。
func (p *DataSource) getListClusterEndpointsReqFromChan() []param {
reqs := make([]param, 0)
select {
case req := <-p.paramCh:
reqs = append(reqs, req)
for i := 1; i < p.step; i++ {
select {
case reqNext := <-p.paramCh:
reqs = append(reqs, reqNext)
default:
break
}
}
}
return reqs
}
最後
這個方法很簡單,但是有一些要註意的地方,得做好監控,比如調用方單個請求的QPS、RT,實際批量請求的QPS、RT,這樣才好計算出處理協程開多少個合適,還有隊列寫入失敗、隊列長度等等監控,當容量不足時及時做出調整。
推薦閱讀
與本文相關的文章也順便推薦給你,如果覺得還不錯,記得關註
、點贊
、在看
、分享
。
搜索關註微信公眾號"捉蟲大師",後端技術分享,架構設計、性能優化、源碼閱讀、問題排查、踩坑實踐;