ASP.NET Core 3.x 併發限制

来源:https://www.cnblogs.com/yyfh/archive/2019/11/12/11843358.html
-Advertisement-
Play Games

前言 Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0後增加的,用於傳入的請求進行排隊處理,避免線程池的不足. 我們日常開發中可能常做的給某web伺服器配置連接數以及,請求隊列大小,那麼今天我們看看如何在通過中間件形式實現一個併發量以及隊列長 ...


前言

Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0後增加的,用於傳入的請求進行排隊處理,避免線程池的不足.
我們日常開發中可能常做的給某web伺服器配置連接數以及,請求隊列大小,那麼今天我們看看如何在通過中間件形式實現一個併發量以及隊列長度限制.

Queue策略

添加Nuget

Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddQueuePolicy(options =>
            {
                //最大併發請求數
                options.MaxConcurrentRequests = 2;
                //請求隊列長度限制
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            //添加併發限制中間件
            app.UseConcurrencyLimiter();
            app.Run(async context =>
            {
                Task.Delay(100).Wait(); // 100ms sync-over-async

                await context.Response.WriteAsync("Hello World!");
            });
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }     

通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做併發量限制,以及隊列的長度;那麼問題來了,他是怎麼實現的呢?

 public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
        services.Configure(configure);
        services.AddSingleton<IQueuePolicy, QueuePolicy>();
        return services;
}

QueuePolicy採用的是SemaphoreSlim信號量設計,SemaphoreSlim、Semaphore(信號量)支持併發多線程進入被保護代碼,對象在初始化時會指定 最大任務數量,當線程請求訪問資源,信號量遞減,而當他們釋放時,信號量計數又遞增。

      /// <summary>
        ///     構造方法(初始化Queue策略)
        /// </summary>
        /// <param name="options"></param>
        public QueuePolicy(IOptions<QueuePolicyOptions> options)
        {
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            if (_maxConcurrentRequests <= 0)
            {
                throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
            }

            _requestQueueLimit = options.Value.RequestQueueLimit;
            if (_requestQueueLimit < 0)
            {
                throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
            }
            //使用SemaphoreSlim來限制任務最大個數
            _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
        }

ConcurrencyLimiterMiddleware中間件

        /// <summary>
        /// Invokes the logic of the middleware.
        /// </summary>
        /// <param name="context">The <see cref="HttpContext"/>.</param>
        /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
        public async Task Invoke(HttpContext context)
        {
            var waitInQueueTask = _queuePolicy.TryEnterAsync();

            // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
            bool result;

            if (waitInQueueTask.IsCompleted)
            {
                ConcurrencyLimiterEventSource.Log.QueueSkipped();
                result = waitInQueueTask.Result;
            }
            else
            {
                using (ConcurrencyLimiterEventSource.Log.QueueTimer())
                {
                    result = await waitInQueueTask;
                }
            }

            if (result)
            {
                try
                {
                    await _next(context);
                }
                finally
                {
                    _queuePolicy.OnExit();
                }
            }
            else
            {
                ConcurrencyLimiterEventSource.Log.RequestRejected();
                ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
                context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
                await _onRejected(context);
            }
        }

每次當我們請求的時候首先會調用_queuePolicy.TryEnterAsync(),進入該方法後先開啟一個私有lock鎖,再接著判斷總請求量是否≥(請求隊列限制的大小+最大併發請求數),如果當前數量超出了,那麼我直接拋出,送你個503狀態;

  if (result)
  {
         try
         {
             await _next(context);
         }
         finally
        {
            _queuePolicy.OnExit();
        }
        }
        else
        {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
        }

問題來了,我這邊如果說還沒到你設置的大小呢,我這個請求沒有給你伺服器造不成壓力,那麼你給我處理一下吧.
await _serverSemaphore.WaitAsync();非同步等待進入信號量,如果沒有線程被授予對信號量的訪問許可權,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止

 lock (_totalRequestsLock)
    {
        if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
        {
             return false;
        }
            TotalRequests++;
        }
        //非同步等待進入信號量,如果沒有線程被授予對信號量的訪問許可權,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止
        await _serverSemaphore.WaitAsync();
        return true;
    }

返回成功後那麼中間件這邊再進行處理,_queuePolicy.OnExit();通過該調用進行調用_serverSemaphore.Release();釋放信號燈,再對總請求數遞減

Stack策略

再來看看另一種方法,棧策略,他是怎麼做的呢?一起來看看.再附加上如何使用的代碼.

     public void ConfigureServices(IServiceCollection services)
        {
            services.AddStackPolicy(options =>
            {
                //最大併發請求數
                options.MaxConcurrentRequests = 2;
                //請求隊列長度限制
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }

通過上面的配置,我們便可以對我們的應用程式執行出相應的策略.下麵再來看看他是怎麼實現的呢

  public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
        {
            services.Configure(configure);
            services.AddSingleton<IQueuePolicy, StackPolicy>();
            return services;
        }

可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法

        /// <summary>
        ///     構造方法(初始化參數)
        /// </summary>
        /// <param name="options"></param>
        public StackPolicy(IOptions<QueuePolicyOptions> options)
        {
            //棧分配
            _buffer = new List<ResettableBooleanCompletionSource>();
            //隊列大小
            _maxQueueCapacity = options.Value.RequestQueueLimit;
            //最大併發請求數
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            //剩餘可用空間
            _freeServerSpots = options.Value.MaxConcurrentRequests;
        }

當我們通過中間件請求調用,_queuePolicy.TryEnterAsync()時,首先會判斷我們是否還有訪問請求次數,如果_freeServerSpots>0,那麼則直接給我們返回true,讓中間件直接去執行下一步,如果當前隊列=我們設置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留後面的請求;

    public ValueTask<bool> TryEnterAsync()
        {
            lock (_bufferLock)
            {
                if (_freeServerSpots > 0)
                {
                    _freeServerSpots--;
                    return _trueTask;
                }
                // 如果隊列滿了,取消先前的請求
                if (_queueLength == _maxQueueCapacity)
                {
                    _hasReachedCapacity = true;
                    _buffer[_head].Complete(false);
                    _queueLength--;
                }
                var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
                _cachedResettableTCS = null;
                if (_hasReachedCapacity || _queueLength < _buffer.Count)
                {
                    _buffer[_head] = tcs;
                }
                else
                {
                    _buffer.Add(tcs);
                }
                _queueLength++;
                // increment _head for next time
                _head++;
                if (_head == _maxQueueCapacity)
                {
                    _head = 0;
                }
                return tcs.GetValueTask();
            }
        }

當我們請求後調用_queuePolicy.OnExit();出棧,再將請求長度遞減

    public void OnExit()
        {
            lock (_bufferLock)
            {
                if (_queueLength == 0)
                {
                    _freeServerSpots++;

                    if (_freeServerSpots > _maxConcurrentRequests)
                    {
                        _freeServerSpots--;
                        throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
                    }

                    return;
                }

                // step backwards and launch a new task
                if (_head == 0)
                {
                    _head = _maxQueueCapacity - 1;
                }
                else
                {
                    _head--;
                }
                //退出,出棧
                _buffer[_head].Complete(true);
                _queueLength--;
            }
        }

總結

基於棧結構的特點,在實際應用中,通常只會對棧執行以下兩種操作:

  • 向棧中添加元素,此過程被稱為"進棧"(入棧或壓棧);
  • 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);

隊列存儲結構的實現有以下兩種方式:

  • 順序隊列:在順序表的基礎上實現的隊列結構;
  • 鏈隊列:在鏈表的基礎上實現的隊列結構;

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Spring之IOC容器初始化 前言 在前面我們分析了最底層的IOC容器BeanFactory,接著簡單分析了高級形態的容器ApplicationContext,在ApplicationContext 中我們知道一個核心方法 refresh,這裡面就是IOC容器的初始化流程,在前面並沒有直接去分析它 ...
  • 一.編寫shell腳本基本格式 拿最簡單的 舉例 . !/bin/bash:告訴電腦,使用bash解釋器來執行代碼 echo: 列印 二.運行shell腳本 (推薦使用) 三.註釋 四.定義變數 基本語法 shell 1.定義變數:變數名=變數值 2.撤銷變數:unset 變數名 3.聲明靜態變數 ...
  • 1.paramiko 用於幫助開發者通過代碼遠程連接伺服器,並對伺服器進行操作。 遠程執行命令【用戶名和密碼】 遠程執行命令【公鑰和私鑰】(公鑰必須提前上傳到伺服器) 遠程上傳和下載文件【用戶名和密碼】 遠程上傳和下載文件【公鑰和私鑰】 補充:通過私鑰字元串也可以連接遠程伺服器。 2.公司員工基於x ...
  • 多態是類的三大特性之一,抽象類又是多態的實現方法之一。抽象類是什麼呢,如果把虛方法比作一個盛有純凈水的杯子,那麼此時的“純凈水”就是事先定義好的方法,我們可以根據不同的需求來改變杯子中所事先盛放的是“純凈水”還是“咖啡”。但是抽象類呢,他更像是一個空的杯子,放在消毒櫃中,讓有需要的人自己去拿,去決定 ...
  • 類型的劃分 一個類型,要麼是值類型,要麼是引用類型 。區別在於拷貝方式:值類型拷貝值,引用類型拷貝引用 值類型 值類型直接包含值。相當於每一個值類型都有自己單獨的值: int a = 10; int b = a; a和b都有著自己的值,修改a並不會影響b,反過來一樣,互不影響。 即使是將實例傳給Co ...
  • 每一個擁有資料庫的項目,都會涉及到資料庫數據的操作,而很多時候都會用到相同的方法,但是只是涉及到的表不一樣,如果不對這些類似方法進行封裝,開發上就會造成時間上的浪費。 那麼如何對這些方法進行封裝呢? 要會封裝方法,最基本的得先瞭解 泛型 是什麼,什麼是泛型,博客園上有很多對這個的講解,我也相信,科班 ...
  • 一、.MemoryCache介紹 MemoryCache是.Net Framework 4.0開始提供的記憶體緩存類,使用該類型可以方便的在程式內部緩存數據並對於數據的有效性進行方便的管理, 它通過在記憶體中緩存數據和對象來減少讀取資料庫的次數,從而減輕資料庫負載,加快數據讀取速度,提升系統的性能。 二 ...
  • .Net Core Vue Qucik Start =========================== This is a ASP.NET Core 3.0 project seamlessly integrationed with Vue.js template. A complaint fr ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...