Go處理每分鐘100萬個請求

来源:https://www.cnblogs.com/phpper/archive/2020/06/30/13211343.html
-Advertisement-
Play Games

引用原文 原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 問題描述 直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考 ...


引用原文

原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

問題描述

直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考慮將耗時任務非同步處理。那麼如果使用Go如何實現?

傳統上,我們會考慮使用以下方法創建工作者層架構:

  • Resque(隊列,比如redis resque)
  • DelayedJob(延遲任務,比如go defer)
  • Elasticbeanstalk Worker Tier
  • RabbitMQ(消息隊列)

簡單慣用法

golang的非同步處理之攜程:go func()可以帶來了很大的方便,雖然協程相對於線程占用的系統資源更少,但這並不代表我們可以無休止的創建協程。

不停創建協程也有壓垮系統的風險。然而絕大多數的時候,我們不能簡單粗暴的創建協程來處理非同步任務,原因是不可控。下麵我們引用原作者的demo,一個執行耗時任務的handler。

代碼我們只用看大致的實現流程原理,實現細節暫且不論。

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

對於適量的負載,這個方案應該沒有問題。但是負載增加以後這個方法就不能很好地工作。當我們把這個版本部署到生產環境中後,如果我們遇到了比預期大一個數量級的請求量。

那麼這個方法就有些不盡如人意了。它無法控制創建goroutine的數量。因為我們每分鐘收到了一百萬個POST請求,上面的代碼很快就奔潰了。

這就是我們遇到的第一個問題,簡單粗暴起協程處理耗時任務導致的系統不可控性。我們自然而然就會想,怎麼能讓系統更可控呢?

優雅的方法

創建帶緩衝的channel。這樣我們可以把工作任務放到隊列里然後再上傳到S3。因為可以控制隊列的長度並且有充足的記憶體,我覺得把工作任務緩存在channel隊列里應該沒有問題。

所以一個很自然的思路那就是:建立任務隊列。golang提供了線程安全的任務隊列實現方式:帶緩衝的channal。但是這樣只是延後了請求的爆發。

作者意識到,要解決這一問題,必須控制協程的數量。如何控制協程的數量?Job/Worker模式!這裡我將作者的代碼修改了一下,單文件可執行,以記錄並理解這一模式。

 

package main

import (
    "fmt"
    "reflect"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待執行的工作
type Job struct {
    Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
    JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
    quit       chan bool     //退出信號
    no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("創建一個新工作者單元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//迴圈  監聽任務和結束信號
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任務
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信號
                return
            }
        }
    }()
}

// 停止信號
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//調度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者數量
    MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 20; i++ {
        // 新建一個任務
        payLoad := Payload{Num: 1}
        work := Job{Payload: payLoad}
        // 任務放入任務隊列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work")
        time.Sleep(1 * time.Second)
    }
}

/*
一個任務的執行過程如下
JobQueue <- work  新任務入隊
job := <-JobQueue: 調度中心收到任務
jobChannel := <-d.WorkerPool 從工作者池取到一個工作者
jobChannel <- job 任務給到工作者
job := <-w.JobChannel 工作者取出任務
{{1}} 執行任務
w.WorkerPool <- w.JobChannel 工作者在放回工作者池
*/

 這樣,我們已經能夠主動的控制worker的數量。這時候,我問哈大家,我們完全解決問題了麽?如果有大量的任務同時涌入,會發生什麼樣的結果。程式會阻塞等待可用的worker

jobChannel := <-d.WorkerPool

下麵是我們的dispatcher實現代碼:

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

這裡我們提供了創建worker的最大數目作為參數,並把這些worker加入到worker池裡。不要忘記,這個調度方法也是在不斷的創建協程等待空閑的worker。我們再改一下代碼如下:

package main

import (
    "fmt"
    "reflect"
    "runtime"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待執行的工作
type Job struct {
    Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
    JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
    quit       chan bool     //退出信號
    no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("創建一個新工作者單元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//迴圈  監聽任務和結束信號
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任務
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信號
                return
            }
        }
    }()
}

// 停止信號
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//調度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者數量
    MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                fmt.Println("等待空閑worker (任務多的時候會阻塞這裡")
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 100; i++ {
        // 新建一個任務
        payLoad := Payload{Num: i}
        work := Job{Payload: payLoad}
        // 任務放入任務隊列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work", i)
        fmt.Println("當前協程數:", runtime.NumGoroutine())
        time.Sleep(100 * time.Millisecond)
    }
}

執行結果如下:

 

 

 

這裡我們發現,我們依然沒能控制住協程數量,我們只是控制住了worker的數量。這種情況下,如果worker數量設置的合理且非同步任務耗時不是特別長的情況下一般沒有問題。但是出於安全的考慮,我要把這個阻塞的協程數做一個控制,如果達到限制時候拒絕服務以保護系統該怎麼處理?

真正控制協程數量(併發執行的任務數)

我們可以控制併發執行(包括等待執行)的任務數。我們加入任務使用如下判斷。用一個帶緩衝的Channel控制併發執行的任務數。

當任務非同步處理完成的時候執行<- DispatchNumControl釋放控制即可。用這種方法,

我們可以根據壓測結果設置合適的併發數從而保證系統能夠儘可能的發揮自己的能力,同時保證不會因為任務量太大而崩潰(因為達到極限的時候,系統會告訴調用方:牛仔我很忙)。

 比如定義一個limit函數讀取是否存在發送的任務隊列:

//用於控制併發處理的協程數
var DispatchNumControl = make(chan bool, 10000)

func Limit(work Job) bool {
   select {
   case <-time.After(time.Millisecond * 100):
      fmt.println("牛仔我很忙")
      return false
   case DispatchNumControl <- true:
   // 任務放入任務隊列channal
      jobChannel <- work
      return true
   }
}

結束語

我們本可以通過大量的隊列,後臺workers,複雜的調度來設計一套複雜的系統,協程是個好的設計,但任何東西都不能過度使用。

我們做系統設計的時候,一定也要時刻想著控制:要對自己設計的系統有著足夠的控制力。
另外綜合上面的實現。為什麼 dispatch 這裡要用 協程 呢?阻塞完全沒問題? 歡迎廣大博友拍磚留言。。。。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在最近的一周,我維護的業務系統出現了很多壞毛病,一周七天crash掉了4次,每次都需要都是因為一點很小的問題,觸發了蝴蝶效應,導致整個系統全盤崩潰,於是產生除了敘述本篇的想法,當然這並不是為了掩蓋我在Coding上的一些細節處理和職責疏忽,只是為了從根本的細節上去分析這些問題。 (一、)為什麼會產生 ...
  • 需求:1、點擊新增一欄商品信息,表單驗證區分 2、輸入SKU編碼,帶出當前行的產品名稱,品牌及單位 解決: ...
  • 本文將以UI設計師轉型做web前端作為案例,詳細解讀學web前端該學習哪些專業知識!當然也適用於所有想轉型web前端的親們! 如何學習web前端知識轉型? 不少UI設計師想要學習web前端知識,一方面是喜歡那些華麗的網頁特效,另一方面則是想成為全棧式UI設計師。 1、HTML/CSS學習 第一階段, ...
  • 使用插件: vue-i18n 1. 安裝: npm install vue-i18n 2. 目錄結構 // i18n.js import Vue from 'vue' import VueI18n from 'vue-i18n' import messages from './lang' Vue.u ...
  • 預設表單提示是英文的,我們可以安裝語言包構建多語言環境。 根據版本選擇命令 For Laravel 7.x : run composer require caouecs/laravel-lang:~6.0 For Laravel 6.x : run composer require caouecs/ ...
  • 1.代碼生成器: [正反雙向](單表、主表、明細表、樹形表,快速開發利器)freemaker模版技術 ,0個代碼不用寫,生成完整的一個模塊,帶頁面、建表sql腳本、處理類、service等完整模塊2.多數據源:(支持同時連接無數個資料庫,可以不同的模塊連接不同數的據庫)支持N個數據源3.阿裡資料庫連 ...
  • 基於Nexus搭建私服 1. 工作流程 2. 倉庫類型 hosted 私服倉庫 proxy倉庫 遠程倉庫 group倉庫 組倉庫,裡面可以設置組合多個倉庫。按順序獲取jar。 3. 預設倉庫 安裝好了Nexus後,會內置幾個maven的預設倉庫。可自定義倉庫。 maven-central proxy ...
  • 1 前言 相信不少人聽過這麼一句話: 人類的本質是復讀機。 在軟體開發領域也一樣,我們總是想尋找更好地方式複製優秀的邏輯或系統。最核心的方法是抽取通用邏輯和組件,把差異化的東西介面化或配置化,達到復用的效果。如Java的Build Once, Run Everywhere,還有Spring的強大的抽 ...
一周排行
    -Advertisement-
    Play Games
  • 比如要拆分“呵呵呵90909086676喝喝999”,下麵當type=0返回的是中文字元串“呵呵呵,喝喝”,type=1返回的是數字字元串“90909086676,999”, private string GetStrings(string str,int type=0) { IList<strin ...
  • Swagger一個優秀的Api介面文檔生成工具。Swagger可以可以動態生成Api介面文檔,有效的降低前後端人員關於Api介面的溝通成本,促進項目高效開發。 1、使用NuGet安裝最新的包:Swashbuckle.AspNetCore。 2、編輯項目文件(NetCoreTemplate.Web.c ...
  • 2020 年 7 月 30 日, 由.NET基金會和微軟 將舉辦一個線上和為期一天的活動,包括 微軟 .NET 團隊的演講者以及社區的演講者。本次線上大會 專註.NET框架構建微服務,演講者分享構建和部署雲原生應用程式的最佳實踐、模式、提示和技巧。有關更多信息和隨時瞭解情況:https://focu... ...
  • #abp框架Excel導出——基於vue #1.技術棧 ##1.1 前端採用vue,官方提供 UI套件用的是iview ##1.2 後臺是abp——aspnetboilerplate 即abp v1,https://github.com/aspnetboilerplate/aspnetboilerp ...
  • 前言 本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。 作者:碧茂大數據 PS:如有需要Python學習資料的小伙伴可以加下方的群去找免費管理員領取 input()輸入 Python提供了 input() 內置函數從標準輸入讀入一 ...
  • 從12年到20年,python以肉眼可見的趨勢超過了java,成為了當今It界人人皆知的編程語言。 python為什麼這麼火? 網路編程語言搜索指數 適合初學者 Python具有語法簡單、語句清晰的特點,這就讓初學者在學習階段可以把精力集中在編程對象和思維方法上。 大佬都在用 Google,YouT ...
  • 在社會上存在一種普遍的對培訓機構的學生一種歧視的現象,具體表現在,比如:當你去公司面試的時候,一旦你說了你是培訓機構出來的,那麼基本上你就涼了,那麼你瞞著不說,然後又通過了面試成功入職,但是以後一旦在公司被髮現有培訓經歷,可能會面臨被降薪,甚至被辭退,培訓機構出來的學生,在用人單位眼裡就是能力低下的 ...
  • from typing import List# 這道題看了大佬寫的代碼,經過自己的理解寫出來了。# 從最外圍的四周找有沒有為O的,如果有的話就進入深搜函數,然後深搜遍歷# 判斷上下左右的位置是否為Oclass Solution: def solve(self, board: List[List[s ...
  • import requests; import re; import os; # 1.請求網頁 header = { "user-agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, li ...
  • import requests; import re; import os; import parsel; 1.請求網頁 header = { "user-agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537. ...