Go處理每分鐘100萬個請求

来源:https://www.cnblogs.com/phpper/archive/2020/06/30/13211343.html

引用原文 原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 問題描述 直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考 ...


引用原文

原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

問題描述

直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考慮將耗時任務非同步處理。那麼如果使用Go如何實現?

傳統上,我們會考慮使用以下方法創建工作者層架構:

  • Resque(隊列,比如redis resque)
  • DelayedJob(延遲任務,比如go defer)
  • Elasticbeanstalk Worker Tier
  • RabbitMQ(消息隊列)

簡單慣用法

golang的非同步處理之攜程:go func()可以帶來了很大的方便,雖然協程相對於線程占用的系統資源更少,但這並不代表我們可以無休止的創建協程。

不停創建協程也有壓垮系統的風險。然而絕大多數的時候,我們不能簡單粗暴的創建協程來處理非同步任務,原因是不可控。下麵我們引用原作者的demo,一個執行耗時任務的handler。

代碼我們只用看大致的實現流程原理,實現細節暫且不論。

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

對於適量的負載,這個方案應該沒有問題。但是負載增加以後這個方法就不能很好地工作。當我們把這個版本部署到生產環境中後,如果我們遇到了比預期大一個數量級的請求量。

那麼這個方法就有些不盡如人意了。它無法控制創建goroutine的數量。因為我們每分鐘收到了一百萬個POST請求,上面的代碼很快就奔潰了。

這就是我們遇到的第一個問題,簡單粗暴起協程處理耗時任務導致的系統不可控性。我們自然而然就會想,怎麼能讓系統更可控呢?

優雅的方法

創建帶緩衝的channel。這樣我們可以把工作任務放到隊列里然後再上傳到S3。因為可以控制隊列的長度並且有充足的記憶體,我覺得把工作任務緩存在channel隊列里應該沒有問題。

所以一個很自然的思路那就是:建立任務隊列。golang提供了線程安全的任務隊列實現方式:帶緩衝的channal。但是這樣只是延後了請求的爆發。

作者意識到,要解決這一問題,必須控制協程的數量。如何控制協程的數量?Job/Worker模式!這裡我將作者的代碼修改了一下,單文件可執行,以記錄並理解這一模式。

 

package main

import (
    "fmt"
    "reflect"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待執行的工作
type Job struct {
    Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
    JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
    quit       chan bool     //退出信號
    no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("創建一個新工作者單元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//迴圈  監聽任務和結束信號
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任務
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信號
                return
            }
        }
    }()
}

// 停止信號
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//調度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者數量
    MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 20; i++ {
        // 新建一個任務
        payLoad := Payload{Num: 1}
        work := Job{Payload: payLoad}
        // 任務放入任務隊列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work")
        time.Sleep(1 * time.Second)
    }
}

/*
一個任務的執行過程如下
JobQueue <- work  新任務入隊
job := <-JobQueue: 調度中心收到任務
jobChannel := <-d.WorkerPool 從工作者池取到一個工作者
jobChannel <- job 任務給到工作者
job := <-w.JobChannel 工作者取出任務
{{1}} 執行任務
w.WorkerPool <- w.JobChannel 工作者在放回工作者池
*/

 這樣,我們已經能夠主動的控制worker的數量。這時候,我問哈大家,我們完全解決問題了麽?如果有大量的任務同時涌入,會發生什麼樣的結果。程式會阻塞等待可用的worker

jobChannel := <-d.WorkerPool

下麵是我們的dispatcher實現代碼:

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

這裡我們提供了創建worker的最大數目作為參數,並把這些worker加入到worker池裡。不要忘記,這個調度方法也是在不斷的創建協程等待空閑的worker。我們再改一下代碼如下:

package main

import (
    "fmt"
    "reflect"
    "runtime"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待執行的工作
type Job struct {
    Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
    JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
    quit       chan bool     //退出信號
    no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("創建一個新工作者單元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//迴圈  監聽任務和結束信號
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任務
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信號
                return
            }
        }
    }()
}

// 停止信號
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//調度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者數量
    MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                fmt.Println("等待空閑worker (任務多的時候會阻塞這裡")
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 100; i++ {
        // 新建一個任務
        payLoad := Payload{Num: i}
        work := Job{Payload: payLoad}
        // 任務放入任務隊列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work", i)
        fmt.Println("當前協程數:", runtime.NumGoroutine())
        time.Sleep(100 * time.Millisecond)
    }
}

執行結果如下:

 

 

 

這裡我們發現,我們依然沒能控制住協程數量,我們只是控制住了worker的數量。這種情況下,如果worker數量設置的合理且非同步任務耗時不是特別長的情況下一般沒有問題。但是出於安全的考慮,我要把這個阻塞的協程數做一個控制,如果達到限制時候拒絕服務以保護系統該怎麼處理?

真正控制協程數量(併發執行的任務數)

我們可以控制併發執行(包括等待執行)的任務數。我們加入任務使用如下判斷。用一個帶緩衝的Channel控制併發執行的任務數。

當任務非同步處理完成的時候執行<- DispatchNumControl釋放控制即可。用這種方法,

我們可以根據壓測結果設置合適的併發數從而保證系統能夠儘可能的發揮自己的能力,同時保證不會因為任務量太大而崩潰(因為達到極限的時候,系統會告訴調用方:牛仔我很忙)。

 比如定義一個limit函數讀取是否存在發送的任務隊列:

//用於控制併發處理的協程數
var DispatchNumControl = make(chan bool, 10000)

func Limit(work Job) bool {
   select {
   case <-time.After(time.Millisecond * 100):
      fmt.println("牛仔我很忙")
      return false
   case DispatchNumControl <- true:
   // 任務放入任務隊列channal
      jobChannel <- work
      return true
   }
}

結束語

我們本可以通過大量的隊列,後臺workers,複雜的調度來設計一套複雜的系統,協程是個好的設計,但任何東西都不能過度使用。

我們做系統設計的時候,一定也要時刻想著控制:要對自己設計的系統有著足夠的控制力。
另外綜合上面的實現。為什麼 dispatch 這裡要用 協程 呢?阻塞完全沒問題? 歡迎廣大博友拍磚留言。。。。

 


您的分享是我們最大的動力!

更多相關文章
  • 在最近的一周,我維護的業務系統出現了很多壞毛病,一周七天crash掉了4次,每次都需要都是因為一點很小的問題,觸發了蝴蝶效應,導致整個系統全盤崩潰,於是產生除了敘述本篇的想法,當然這並不是為了掩蓋我在Coding上的一些細節處理和職責疏忽,只是為了從根本的細節上去分析這些問題。 (一、)為什麼會產生 ...
  • 需求:1、點擊新增一欄商品信息,表單驗證區分 2、輸入SKU編碼,帶出當前行的產品名稱,品牌及單位 解決: ...
  • 本文將以UI設計師轉型做web前端作為案例,詳細解讀學web前端該學習哪些專業知識!當然也適用於所有想轉型web前端的親們! 如何學習web前端知識轉型? 不少UI設計師想要學習web前端知識,一方面是喜歡那些華麗的網頁特效,另一方面則是想成為全棧式UI設計師。 1、HTML/CSS學習 第一階段, ...
  • 使用插件: vue-i18n 1. 安裝: npm install vue-i18n 2. 目錄結構 // i18n.js import Vue from 'vue' import VueI18n from 'vue-i18n' import messages from './lang' Vue.u ...
  • 預設表單提示是英文的,我們可以安裝語言包構建多語言環境。 根據版本選擇命令 For Laravel 7.x : run composer require caouecs/laravel-lang:~6.0 For Laravel 6.x : run composer require caouecs/ ...
  • 1.代碼生成器: [正反雙向](單表、主表、明細表、樹形表,快速開發利器)freemaker模版技術 ,0個代碼不用寫,生成完整的一個模塊,帶頁面、建表sql腳本、處理類、service等完整模塊2.多數據源:(支持同時連接無數個資料庫,可以不同的模塊連接不同數的據庫)支持N個數據源3.阿裡資料庫連 ...
  • 基於Nexus搭建私服 1. 工作流程 2. 倉庫類型 hosted 私服倉庫 proxy倉庫 遠程倉庫 group倉庫 組倉庫,裡面可以設置組合多個倉庫。按順序獲取jar。 3. 預設倉庫 安裝好了Nexus後,會內置幾個maven的預設倉庫。可自定義倉庫。 maven-central proxy ...
  • 1 前言 相信不少人聽過這麼一句話: 人類的本質是復讀機。 在軟體開發領域也一樣,我們總是想尋找更好地方式複製優秀的邏輯或系統。最核心的方法是抽取通用邏輯和組件,把差異化的東西介面化或配置化,達到復用的效果。如Java的Build Once, Run Everywhere,還有Spring的強大的抽 ...
一周排行
  • C#6.0新特性 C#7.0新特性 C#8.0新特性 ...
  • out變數 可以直接在方法中使用out申明變數 int.TryParse("123", out var result); 元組 元組的申明 var alphaBetaStart = (alpha: "a", beta: "b"); Console.WriteLine($"{alphaBetaStar ...
  • 在我們的項目中,通常會把數據存儲到關係型資料庫中,比如Oracle,SQL Server,Mysql等,但是關係型資料庫對於併發的支持並不是很強大,這樣就會造成系統的性能不佳,而且存儲的數據多為結構化數據,對於非結構數據(比如文本)和半結構化數據(比如JSon) 就顯得不夠靈活,而非關係型資料庫則很 ...
  • 這幾天終於弄懂了async和await的模式,也搞明白了一直在心裡面積壓著的許多問題,所以寫一篇博客來和大家分享一下。 關於非同步機制我認為只要記住的以下幾點,就可以弄明白了: 1.我認為async和awwait兩個修飾符中最關鍵的是await,async是由於方法中包含await修飾符之後才在方法定 ...
  • 實現WCF的步驟如下: 設計服務協議 實現服務協議 配置服務 托管服務 生成客戶端(這步可有可無) 設計或定義服務協議要麼使用介面,要麼使用類。建議介面,使用介面好處一堆例如修改介面的實現,但是服務協定有無需改變。 設計服務協議,介面上使用 ServiceContractAttribute ,方法上 ...
  • 什麼鬼,我的CPF快寫好了,你居然也要搞跨平臺UI框架?什麼Maui? 之前怎麼不早說要搞跨平臺UI框架呢?看到谷歌搞flutter眼紅了?明年年底發佈?又搞這種追別人屁股的爛事情。 什麼MVU模式?模仿Dart?用C#代碼直接寫UI的模式和我的CPF很像啊。 當初我考慮過XML,Json來描述UI ...
  • 寫在前面 Docker作為開源的應用容器引擎,可以讓我們很輕鬆的構建一個輕量級、易移植的容器,通過Docker方式進行持續交付、測試和部署,都是極為方便的,並且對於我們開發來說,最直觀的優點還是解決了日常開發中的環境配置與部署環境配置上的差異所帶來的種種疑難雜症,從此推脫產品的措辭也少了——“我電腦 ...
  • 一、前言 回顧:認證授權方案之授權初識 從上一節中,我們在對授權系統已經有了初步的認識和使用,可以發現,asp.net core為我們提供的授權策略是一個非常強大豐富且靈活的認證授權方案,能夠滿足大部分的授權場景。 在ConfigureServices中配置服務:將授權服務添加到容器 public ...
  • 項目背景: 工作之餘兼職一家公司(方向是工業4.0)給做IM系統,主要功能包括:文字、 圖片、文件傳輸、遠程協助、視頻語音等等。這些功能都是基於群會話, 比如工廠操作工人遇到問題,請求遠程專家,這個初級專家不能解決問題,會邀請一個高級專家進來解決。開發過程中主要遇到的問題是視頻和語音這一塊,像其他的... ...
  • 基礎概念 Microsoft中間語言(MSIL),也成為通用中間語言(CIL),是一組與平臺無關的指令,由特定於語言的編譯器從源代碼生成。MSIL是獨立於平臺的,因此,他可以在任何公共語言基礎架構支持特定的環境上執行。 通過JIT編譯器將MSIL轉換為特定電腦環境的特定機器代碼。這是在執行MSIL ...