Go處理每分鐘100萬個請求

来源:https://www.cnblogs.com/phpper/archive/2020/06/30/13211343.html
-Advertisement-
Play Games

引用原文 原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 問題描述 直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考 ...


引用原文

原文鏈接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

問題描述

直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端需要處理大量耗時的任務時,我們一般都會考慮將耗時任務非同步處理。那麼如果使用Go如何實現?

傳統上,我們會考慮使用以下方法創建工作者層架構:

  • Resque(隊列,比如redis resque)
  • DelayedJob(延遲任務,比如go defer)
  • Elasticbeanstalk Worker Tier
  • RabbitMQ(消息隊列)

簡單慣用法

golang的非同步處理之攜程:go func()可以帶來了很大的方便,雖然協程相對於線程占用的系統資源更少,但這並不代表我們可以無休止的創建協程。

不停創建協程也有壓垮系統的風險。然而絕大多數的時候,我們不能簡單粗暴的創建協程來處理非同步任務,原因是不可控。下麵我們引用原作者的demo,一個執行耗時任務的handler。

代碼我們只用看大致的實現流程原理,實現細節暫且不論。

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

對於適量的負載,這個方案應該沒有問題。但是負載增加以後這個方法就不能很好地工作。當我們把這個版本部署到生產環境中後,如果我們遇到了比預期大一個數量級的請求量。

那麼這個方法就有些不盡如人意了。它無法控制創建goroutine的數量。因為我們每分鐘收到了一百萬個POST請求,上面的代碼很快就奔潰了。

這就是我們遇到的第一個問題,簡單粗暴起協程處理耗時任務導致的系統不可控性。我們自然而然就會想,怎麼能讓系統更可控呢?

優雅的方法

創建帶緩衝的channel。這樣我們可以把工作任務放到隊列里然後再上傳到S3。因為可以控制隊列的長度並且有充足的記憶體,我覺得把工作任務緩存在channel隊列里應該沒有問題。

所以一個很自然的思路那就是:建立任務隊列。golang提供了線程安全的任務隊列實現方式:帶緩衝的channal。但是這樣只是延後了請求的爆發。

作者意識到,要解決這一問題,必須控制協程的數量。如何控制協程的數量?Job/Worker模式!這裡我將作者的代碼修改了一下,單文件可執行,以記錄並理解這一模式。

 

package main

import (
    "fmt"
    "reflect"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待執行的工作
type Job struct {
    Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
    JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
    quit       chan bool     //退出信號
    no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("創建一個新工作者單元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//迴圈  監聽任務和結束信號
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任務
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信號
                return
            }
        }
    }()
}

// 停止信號
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//調度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者數量
    MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 20; i++ {
        // 新建一個任務
        payLoad := Payload{Num: 1}
        work := Job{Payload: payLoad}
        // 任務放入任務隊列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work")
        time.Sleep(1 * time.Second)
    }
}

/*
一個任務的執行過程如下
JobQueue <- work  新任務入隊
job := <-JobQueue: 調度中心收到任務
jobChannel := <-d.WorkerPool 從工作者池取到一個工作者
jobChannel <- job 任務給到工作者
job := <-w.JobChannel 工作者取出任務
{{1}} 執行任務
w.WorkerPool <- w.JobChannel 工作者在放回工作者池
*/

 這樣,我們已經能夠主動的控制worker的數量。這時候,我問哈大家,我們完全解決問題了麽?如果有大量的任務同時涌入,會發生什麼樣的結果。程式會阻塞等待可用的worker

jobChannel := <-d.WorkerPool

下麵是我們的dispatcher實現代碼:

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

這裡我們提供了創建worker的最大數目作為參數,並把這些worker加入到worker池裡。不要忘記,這個調度方法也是在不斷的創建協程等待空閑的worker。我們再改一下代碼如下:

package main

import (
    "fmt"
    "reflect"
    "runtime"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待執行的工作
type Job struct {
    Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
    JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
    quit       chan bool     //退出信號
    no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("創建一個新工作者單元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//迴圈  監聽任務和結束信號
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任務
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信號
                return
            }
        }
    }()
}

// 停止信號
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//調度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者數量
    MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                fmt.Println("等待空閑worker (任務多的時候會阻塞這裡")
                //等待空閑worker (任務多的時候會阻塞這裡)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 將任務放到上述woker的私有任務channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 100; i++ {
        // 新建一個任務
        payLoad := Payload{Num: i}
        work := Job{Payload: payLoad}
        // 任務放入任務隊列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work", i)
        fmt.Println("當前協程數:", runtime.NumGoroutine())
        time.Sleep(100 * time.Millisecond)
    }
}

執行結果如下:

 

 

 

這裡我們發現,我們依然沒能控制住協程數量,我們只是控制住了worker的數量。這種情況下,如果worker數量設置的合理且非同步任務耗時不是特別長的情況下一般沒有問題。但是出於安全的考慮,我要把這個阻塞的協程數做一個控制,如果達到限制時候拒絕服務以保護系統該怎麼處理?

真正控制協程數量(併發執行的任務數)

我們可以控制併發執行(包括等待執行)的任務數。我們加入任務使用如下判斷。用一個帶緩衝的Channel控制併發執行的任務數。

當任務非同步處理完成的時候執行<- DispatchNumControl釋放控制即可。用這種方法,

我們可以根據壓測結果設置合適的併發數從而保證系統能夠儘可能的發揮自己的能力,同時保證不會因為任務量太大而崩潰(因為達到極限的時候,系統會告訴調用方:牛仔我很忙)。

 比如定義一個limit函數讀取是否存在發送的任務隊列:

//用於控制併發處理的協程數
var DispatchNumControl = make(chan bool, 10000)

func Limit(work Job) bool {
   select {
   case <-time.After(time.Millisecond * 100):
      fmt.println("牛仔我很忙")
      return false
   case DispatchNumControl <- true:
   // 任務放入任務隊列channal
      jobChannel <- work
      return true
   }
}

結束語

我們本可以通過大量的隊列,後臺workers,複雜的調度來設計一套複雜的系統,協程是個好的設計,但任何東西都不能過度使用。

我們做系統設計的時候,一定也要時刻想著控制:要對自己設計的系統有著足夠的控制力。
另外綜合上面的實現。為什麼 dispatch 這裡要用 協程 呢?阻塞完全沒問題? 歡迎廣大博友拍磚留言。。。。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在最近的一周,我維護的業務系統出現了很多壞毛病,一周七天crash掉了4次,每次都需要都是因為一點很小的問題,觸發了蝴蝶效應,導致整個系統全盤崩潰,於是產生除了敘述本篇的想法,當然這並不是為了掩蓋我在Coding上的一些細節處理和職責疏忽,只是為了從根本的細節上去分析這些問題。 (一、)為什麼會產生 ...
  • 需求:1、點擊新增一欄商品信息,表單驗證區分 2、輸入SKU編碼,帶出當前行的產品名稱,品牌及單位 解決: ...
  • 本文將以UI設計師轉型做web前端作為案例,詳細解讀學web前端該學習哪些專業知識!當然也適用於所有想轉型web前端的親們! 如何學習web前端知識轉型? 不少UI設計師想要學習web前端知識,一方面是喜歡那些華麗的網頁特效,另一方面則是想成為全棧式UI設計師。 1、HTML/CSS學習 第一階段, ...
  • 使用插件: vue-i18n 1. 安裝: npm install vue-i18n 2. 目錄結構 // i18n.js import Vue from 'vue' import VueI18n from 'vue-i18n' import messages from './lang' Vue.u ...
  • 預設表單提示是英文的,我們可以安裝語言包構建多語言環境。 根據版本選擇命令 For Laravel 7.x : run composer require caouecs/laravel-lang:~6.0 For Laravel 6.x : run composer require caouecs/ ...
  • 1.代碼生成器: [正反雙向](單表、主表、明細表、樹形表,快速開發利器)freemaker模版技術 ,0個代碼不用寫,生成完整的一個模塊,帶頁面、建表sql腳本、處理類、service等完整模塊2.多數據源:(支持同時連接無數個資料庫,可以不同的模塊連接不同數的據庫)支持N個數據源3.阿裡資料庫連 ...
  • 基於Nexus搭建私服 1. 工作流程 2. 倉庫類型 hosted 私服倉庫 proxy倉庫 遠程倉庫 group倉庫 組倉庫,裡面可以設置組合多個倉庫。按順序獲取jar。 3. 預設倉庫 安裝好了Nexus後,會內置幾個maven的預設倉庫。可自定義倉庫。 maven-central proxy ...
  • 1 前言 相信不少人聽過這麼一句話: 人類的本質是復讀機。 在軟體開發領域也一樣,我們總是想尋找更好地方式複製優秀的邏輯或系統。最核心的方法是抽取通用邏輯和組件,把差異化的東西介面化或配置化,達到復用的效果。如Java的Build Once, Run Everywhere,還有Spring的強大的抽 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...