Go處理每分鐘100萬個請求

来源:https://www.cnblogs.com/phpper/archive/2020/06/30/13211343.html
-Advertisement-
Play Games

引用原文 原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 問題描述 直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考 ...


引用原文

原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

問題描述

直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考慮將耗時任務非同步處理。那麼如果使用Go如何實現?

傳統上,我們會考慮使用以下方法創建工作者層架構:

  • Resque(隊列,比如redis resque)
  • DelayedJob(延遲任務,比如go defer)
  • Elasticbeanstalk Worker Tier
  • RabbitMQ(消息隊列)

簡單慣用法

golang的非同步處理之攜程:go func()可以帶來了很大的方便,雖然協程相對於線程占用的系統資源更少,但這並不代表我們可以無休止的創建協程。

不停創建協程也有壓垮系統的風險。然而絕大多數的時候,我們不能簡單粗暴的創建協程來處理非同步任務,原因是不可控。下麵我們引用原作者的demo,一個執行耗時任務的handler。

代碼我們只用看大致的實現流程原理,實現細節暫且不論。

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

對於適量的負載,這個方案應該沒有問題。但是負載增加以後這個方法就不能很好地工作。當我們把這個版本部署到生產環境中後,如果我們遇到了比預期大一個數量級的請求量。

那麼這個方法就有些不盡如人意了。它無法控制創建goroutine的數量。因為我們每分鐘收到了一百萬個POST請求,上面的代碼很快就奔潰了。

這就是我們遇到的第一個問題,簡單粗暴起協程處理耗時任務導致的系統不可控性。我們自然而然就會想,怎麼能讓系統更可控呢?

優雅的方法

創建帶緩衝的channel。這樣我們可以把工作任務放到隊列里然後再上傳到S3。因為可以控制隊列的長度並且有充足的記憶體,我覺得把工作任務緩存在channel隊列里應該沒有問題。

所以一個很自然的思路那就是:建立任務隊列。golang提供了線程安全的任務隊列實現方式:帶緩衝的channal。但是這樣只是延後了請求的爆發。

作者意識到,要解決這一問題,必須控制協程的數量。如何控制協程的數量?Job/Worker模式!這裡我將作者的代碼修改了一下,單文件可執行,以記錄並理解這一模式。

 

package main

import (
    "fmt"
    "reflect"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待執行的工作
type Job struct {
    Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
    JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
    quit       chan bool     //退出信號
    no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("創建一個新工作者單元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//迴圈  監聽任務和結束信號
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任務
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信號
                return
            }
        }
    }()
}

// 停止信號
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//調度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者數量
    MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 20; i++ {
        // 新建一個任務
        payLoad := Payload{Num: 1}
        work := Job{Payload: payLoad}
        // 任務放入任務隊列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work")
        time.Sleep(1 * time.Second)
    }
}

/*
一個任務的執行過程如下
JobQueue <- work  新任務入隊
job := <-JobQueue: 調度中心收到任務
jobChannel := <-d.WorkerPool 從工作者池取到一個工作者
jobChannel <- job 任務給到工作者
job := <-w.JobChannel 工作者取出任務
{{1}} 執行任務
w.WorkerPool <- w.JobChannel 工作者在放回工作者池
*/

 這樣,我們已經能夠主動的控制worker的數量。這時候,我問哈大家,我們完全解決問題了麽?如果有大量的任務同時涌入,會發生什麼樣的結果。程式會阻塞等待可用的worker

jobChannel := <-d.WorkerPool

下麵是我們的dispatcher實現代碼:

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

這裡我們提供了創建worker的最大數目作為參數,並把這些worker加入到worker池裡。不要忘記,這個調度方法也是在不斷的創建協程等待空閑的worker。我們再改一下代碼如下:

package main

import (
    "fmt"
    "reflect"
    "runtime"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待執行的工作
type Job struct {
    Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
    JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
    quit       chan bool     //退出信號
    no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("創建一個新工作者單元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//迴圈  監聽任務和結束信號
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任務
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信號
                return
            }
        }
    }()
}

// 停止信號
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//調度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者數量
    MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                fmt.Println("等待空閑worker (任務多的時候會阻塞這裡")
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 100; i++ {
        // 新建一個任務
        payLoad := Payload{Num: i}
        work := Job{Payload: payLoad}
        // 任務放入任務隊列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work", i)
        fmt.Println("當前協程數:", runtime.NumGoroutine())
        time.Sleep(100 * time.Millisecond)
    }
}

執行結果如下:

 

 

 

這裡我們發現,我們依然沒能控制住協程數量,我們只是控制住了worker的數量。這種情況下,如果worker數量設置的合理且非同步任務耗時不是特別長的情況下一般沒有問題。但是出於安全的考慮,我要把這個阻塞的協程數做一個控制,如果達到限制時候拒絕服務以保護系統該怎麼處理?

真正控制協程數量(併發執行的任務數)

我們可以控制併發執行(包括等待執行)的任務數。我們加入任務使用如下判斷。用一個帶緩衝的Channel控制併發執行的任務數。

當任務非同步處理完成的時候執行<- DispatchNumControl釋放控制即可。用這種方法,

我們可以根據壓測結果設置合適的併發數從而保證系統能夠儘可能的發揮自己的能力,同時保證不會因為任務量太大而崩潰(因為達到極限的時候,系統會告訴調用方:牛仔我很忙)。

 比如定義一個limit函數讀取是否存在發送的任務隊列:

//用於控制併發處理的協程數
var DispatchNumControl = make(chan bool, 10000)

func Limit(work Job) bool {
   select {
   case <-time.After(time.Millisecond * 100):
      fmt.println("牛仔我很忙")
      return false
   case DispatchNumControl <- true:
   // 任務放入任務隊列channal
      jobChannel <- work
      return true
   }
}

結束語

我們本可以通過大量的隊列,後臺workers,複雜的調度來設計一套複雜的系統,協程是個好的設計,但任何東西都不能過度使用。

我們做系統設計的時候,一定也要時刻想著控制:要對自己設計的系統有著足夠的控制力。
另外綜合上面的實現。為什麼 dispatch 這裡要用 協程 呢?阻塞完全沒問題? 歡迎廣大博友拍磚留言。。。。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在最近的一周,我維護的業務系統出現了很多壞毛病,一周七天crash掉了4次,每次都需要都是因為一點很小的問題,觸發了蝴蝶效應,導致整個系統全盤崩潰,於是產生除了敘述本篇的想法,當然這並不是為了掩蓋我在Coding上的一些細節處理和職責疏忽,只是為了從根本的細節上去分析這些問題。 (一、)為什麼會產生 ...
  • 需求:1、點擊新增一欄商品信息,表單驗證區分 2、輸入SKU編碼,帶出當前行的產品名稱,品牌及單位 解決: ...
  • 本文將以UI設計師轉型做web前端作為案例,詳細解讀學web前端該學習哪些專業知識!當然也適用於所有想轉型web前端的親們! 如何學習web前端知識轉型? 不少UI設計師想要學習web前端知識,一方面是喜歡那些華麗的網頁特效,另一方面則是想成為全棧式UI設計師。 1、HTML/CSS學習 第一階段, ...
  • 使用插件: vue-i18n 1. 安裝: npm install vue-i18n 2. 目錄結構 // i18n.js import Vue from 'vue' import VueI18n from 'vue-i18n' import messages from './lang' Vue.u ...
  • 預設表單提示是英文的,我們可以安裝語言包構建多語言環境。 根據版本選擇命令 For Laravel 7.x : run composer require caouecs/laravel-lang:~6.0 For Laravel 6.x : run composer require caouecs/ ...
  • 1.代碼生成器: [正反雙向](單表、主表、明細表、樹形表,快速開發利器)freemaker模版技術 ,0個代碼不用寫,生成完整的一個模塊,帶頁面、建表sql腳本、處理類、service等完整模塊2.多數據源:(支持同時連接無數個資料庫,可以不同的模塊連接不同數的據庫)支持N個數據源3.阿裡資料庫連 ...
  • 基於Nexus搭建私服 1. 工作流程 2. 倉庫類型 hosted 私服倉庫 proxy倉庫 遠程倉庫 group倉庫 組倉庫,裡面可以設置組合多個倉庫。按順序獲取jar。 3. 預設倉庫 安裝好了Nexus後,會內置幾個maven的預設倉庫。可自定義倉庫。 maven-central proxy ...
  • 1 前言 相信不少人聽過這麼一句話: 人類的本質是復讀機。 在軟體開發領域也一樣,我們總是想尋找更好地方式複製優秀的邏輯或系統。最核心的方法是抽取通用邏輯和組件,把差異化的東西介面化或配置化,達到復用的效果。如Java的Build Once, Run Everywhere,還有Spring的強大的抽 ...
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...