引用原文 原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 問題描述 直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考 ...
引用原文
原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
問題描述
直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考慮將耗時任務非同步處理。那麼如果使用Go如何實現?
傳統上,我們會考慮使用以下方法創建工作者層架構:
- Resque(隊列,比如redis resque)
- DelayedJob(延遲任務,比如go defer)
- Elasticbeanstalk Worker Tier
- RabbitMQ(消息隊列)
簡單慣用法
golang的非同步處理之攜程:go func()可以帶來了很大的方便,雖然協程相對於線程占用的系統資源更少,但這並不代表我們可以無休止的創建協程。
不停創建協程也有壓垮系統的風險。然而絕大多數的時候,我們不能簡單粗暴的創建協程來處理非同步任務,原因是不可控。下麵我們引用原作者的demo,一個執行耗時任務的handler。
代碼我們只用看大致的實現流程原理,實現細節暫且不論。
package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "time" ) type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) } func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) }
對於適量的負載,這個方案應該沒有問題。但是負載增加以後這個方法就不能很好地工作。當我們把這個版本部署到生產環境中後,如果我們遇到了比預期大一個數量級的請求量。
那麼這個方法就有些不盡如人意了。它無法控制創建goroutine的數量。因為我們每分鐘收到了一百萬個POST請求,上面的代碼很快就奔潰了。
這就是我們遇到的第一個問題,簡單粗暴起協程處理耗時任務導致的系統不可控性。我們自然而然就會想,怎麼能讓系統更可控呢?
優雅的方法
創建帶緩衝的channel。這樣我們可以把工作任務放到隊列里然後再上傳到S3。因為可以控制隊列的長度並且有充足的記憶體,我覺得把工作任務緩存在channel隊列里應該沒有問題。
所以一個很自然的思路那就是:建立任務隊列。golang提供了線程安全的任務隊列實現方式:帶緩衝的channal。但是這樣只是延後了請求的爆發。
作者意識到,要解決這一問題,必須控制協程的數量。如何控制協程的數量?Job/Worker模式!這裡我將作者的代碼修改了一下,單文件可執行,以記錄並理解這一模式。
package main import ( "fmt" "reflect" "time" ) var ( MaxWorker = 10 ) type Payload struct { Num int } //待執行的工作 type Job struct { Payload Payload } //任務channal var JobQueue chan Job //執行任務的工作者單元 type Worker struct { WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal JobChannel chan Job //每個工作者單元包含一個任務管道 用於獲取任務 quit chan bool //退出信號 no int //編號 } //創建一個新工作者單元 func NewWorker(workerPool chan chan Job, no int) Worker { fmt.Println("創建一個新工作者單元") return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), no: no, } } //迴圈 監聽任務和結束信號 func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel fmt.Println("w.WorkerPool <- w.JobChannel", w) select { case job := <-w.JobChannel: fmt.Println("job := <-w.JobChannel") // 收到任務 fmt.Println(job) time.Sleep(100 * time.Second) case <-w.quit: // 收到退出信號 return } } }() } // 停止信號 func (w Worker) Stop() { go func() { w.quit <- true }() } //調度中心 type Dispatcher struct { //工作者池 WorkerPool chan chan Job //工作者數量 MaxWorkers int } //創建調度中心 func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers} } //工作者池的初始化 func (d *Dispatcher) Run() { // starting n number of workers for i := 1; i < d.MaxWorkers+1; i++ { worker := NewWorker(d.WorkerPool, i) worker.Start() } go d.dispatch() } //調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { //等待空閑worker (任務多的時候會阻塞這裡) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } } func main() { JobQueue = make(chan Job, 10) dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() time.Sleep(1 * time.Second) go addQueue() time.Sleep(1000 * time.Second) } func addQueue() { for i := 0; i < 20; i++ { // 新建一個任務 payLoad := Payload{Num: 1} work := Job{Payload: payLoad} // 任務放入任務隊列channal JobQueue <- work fmt.Println("JobQueue <- work") time.Sleep(1 * time.Second) } } /* 一個任務的執行過程如下 JobQueue <- work 新任務入隊 job := <-JobQueue: 調度中心收到任務 jobChannel := <-d.WorkerPool 從工作者池取到一個工作者 jobChannel <- job 任務給到工作者 job := <-w.JobChannel 工作者取出任務 {{1}} 執行任務 w.WorkerPool <- w.JobChannel 工作者在放回工作者池 */
這樣,我們已經能夠主動的控制worker的數量。這時候,我問哈大家,我們完全解決問題了麽?如果有大量的任務同時涌入,會發生什麼樣的結果。程式會阻塞等待可用的worker
jobChannel := <-d.WorkerPool
下麵是我們的dispatcher實現代碼:
//調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { //等待空閑worker (任務多的時候會阻塞這裡) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } }
這裡我們提供了創建worker的最大數目作為參數,並把這些worker加入到worker池裡。不要忘記,這個調度方法也是在不斷的創建協程等待空閑的worker。我們再改一下代碼如下:
package main import ( "fmt" "reflect" "runtime" "time" ) var ( MaxWorker = 10 ) type Payload struct { Num int } //待執行的工作 type Job struct { Payload Payload } //任務channal var JobQueue chan Job //執行任務的工作者單元 type Worker struct { WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal JobChannel chan Job //每個工作者單元包含一個任務管道 用於獲取任務 quit chan bool //退出信號 no int //編號 } //創建一個新工作者單元 func NewWorker(workerPool chan chan Job, no int) Worker { fmt.Println("創建一個新工作者單元") return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), no: no, } } //迴圈 監聽任務和結束信號 func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel fmt.Println("w.WorkerPool <- w.JobChannel", w) select { case job := <-w.JobChannel: fmt.Println("job := <-w.JobChannel") // 收到任務 fmt.Println(job) time.Sleep(100 * time.Second) case <-w.quit: // 收到退出信號 return } } }() } // 停止信號 func (w Worker) Stop() { go func() { w.quit <- true }() } //調度中心 type Dispatcher struct { //工作者池 WorkerPool chan chan Job //工作者數量 MaxWorkers int } //創建調度中心 func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers} } //工作者池的初始化 func (d *Dispatcher) Run() { // starting n number of workers for i := 1; i < d.MaxWorkers+1; i++ { worker := NewWorker(d.WorkerPool, i) worker.Start() } go d.dispatch() } //調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { fmt.Println("等待空閑worker (任務多的時候會阻塞這裡") //等待空閑worker (任務多的時候會阻塞這裡) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } } func main() { JobQueue = make(chan Job, 10) dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() time.Sleep(1 * time.Second) go addQueue() time.Sleep(1000 * time.Second) } func addQueue() { for i := 0; i < 100; i++ { // 新建一個任務 payLoad := Payload{Num: i} work := Job{Payload: payLoad} // 任務放入任務隊列channal JobQueue <- work fmt.Println("JobQueue <- work", i) fmt.Println("當前協程數:", runtime.NumGoroutine()) time.Sleep(100 * time.Millisecond) } }
執行結果如下:
這裡我們發現,我們依然沒能控制住協程數量,我們只是控制住了worker的數量。這種情況下,如果worker數量設置的合理且非同步任務耗時不是特別長的情況下一般沒有問題。但是出於安全的考慮,我要把這個阻塞的協程數做一個控制,如果達到限制時候拒絕服務以保護系統該怎麼處理?
真正控制協程數量(併發執行的任務數)
我們可以控制併發執行(包括等待執行)的任務數。我們加入任務使用如下判斷。用一個帶緩衝的Channel控制併發執行的任務數。
當任務非同步處理完成的時候執行<- DispatchNumControl
釋放控制即可。用這種方法,
我們可以根據壓測結果設置合適的併發數從而保證系統能夠儘可能的發揮自己的能力,同時保證不會因為任務量太大而崩潰(因為達到極限的時候,系統會告訴調用方:牛仔我很忙)。
比如定義一個limit函數讀取是否存在發送的任務隊列:
//用於控制併發處理的協程數 var DispatchNumControl = make(chan bool, 10000) func Limit(work Job) bool { select { case <-time.After(time.Millisecond * 100): fmt.println("牛仔我很忙") return false case DispatchNumControl <- true: // 任務放入任務隊列channal jobChannel <- work return true } }
結束語
我們本可以通過大量的隊列,後臺workers,複雜的調度來設計一套複雜的系統,協程是個好的設計,但任何東西都不能過度使用。
我們做系統設計的時候,一定也要時刻想著控制:要對自己設計的系統有著足夠的控制力。
另外綜合上面的實現。為什麼 dispatch 這裡要用 協程 呢?阻塞完全沒問題? 歡迎廣大博友拍磚留言。。。。