ASP.NET Core 3.x 併發限制

来源:https://www.cnblogs.com/yyfh/archive/2019/11/12/11843358.html
-Advertisement-
Play Games

前言 Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0後增加的,用於傳入的請求進行排隊處理,避免線程池的不足. 我們日常開發中可能常做的給某web伺服器配置連接數以及,請求隊列大小,那麼今天我們看看如何在通過中間件形式實現一個併發量以及隊列長 ...


前言

Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0後增加的,用於傳入的請求進行排隊處理,避免線程池的不足.
我們日常開發中可能常做的給某web伺服器配置連接數以及,請求隊列大小,那麼今天我們看看如何在通過中間件形式實現一個併發量以及隊列長度限制.

Queue策略

添加Nuget

Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddQueuePolicy(options =>
            {
                //最大併發請求數
                options.MaxConcurrentRequests = 2;
                //請求隊列長度限制
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            //添加併發限制中間件
            app.UseConcurrencyLimiter();
            app.Run(async context =>
            {
                Task.Delay(100).Wait(); // 100ms sync-over-async

                await context.Response.WriteAsync("Hello World!");
            });
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }     

通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做併發量限制,以及隊列的長度;那麼問題來了,他是怎麼實現的呢?

 public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
        services.Configure(configure);
        services.AddSingleton<IQueuePolicy, QueuePolicy>();
        return services;
}

QueuePolicy採用的是SemaphoreSlim信號量設計,SemaphoreSlim、Semaphore(信號量)支持併發多線程進入被保護代碼,對象在初始化時會指定 最大任務數量,當線程請求訪問資源,信號量遞減,而當他們釋放時,信號量計數又遞增。

      /// <summary>
        ///     構造方法(初始化Queue策略)
        /// </summary>
        /// <param name="options"></param>
        public QueuePolicy(IOptions<QueuePolicyOptions> options)
        {
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            if (_maxConcurrentRequests <= 0)
            {
                throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
            }

            _requestQueueLimit = options.Value.RequestQueueLimit;
            if (_requestQueueLimit < 0)
            {
                throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
            }
            //使用SemaphoreSlim來限制任務最大個數
            _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
        }

ConcurrencyLimiterMiddleware中間件

        /// <summary>
        /// Invokes the logic of the middleware.
        /// </summary>
        /// <param name="context">The <see cref="HttpContext"/>.</param>
        /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
        public async Task Invoke(HttpContext context)
        {
            var waitInQueueTask = _queuePolicy.TryEnterAsync();

            // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
            bool result;

            if (waitInQueueTask.IsCompleted)
            {
                ConcurrencyLimiterEventSource.Log.QueueSkipped();
                result = waitInQueueTask.Result;
            }
            else
            {
                using (ConcurrencyLimiterEventSource.Log.QueueTimer())
                {
                    result = await waitInQueueTask;
                }
            }

            if (result)
            {
                try
                {
                    await _next(context);
                }
                finally
                {
                    _queuePolicy.OnExit();
                }
            }
            else
            {
                ConcurrencyLimiterEventSource.Log.RequestRejected();
                ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
                context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
                await _onRejected(context);
            }
        }

每次當我們請求的時候首先會調用_queuePolicy.TryEnterAsync(),進入該方法後先開啟一個私有lock鎖,再接著判斷總請求量是否≥(請求隊列限制的大小+最大併發請求數),如果當前數量超出了,那麼我直接拋出,送你個503狀態;

  if (result)
  {
         try
         {
             await _next(context);
         }
         finally
        {
            _queuePolicy.OnExit();
        }
        }
        else
        {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
        }

問題來了,我這邊如果說還沒到你設置的大小呢,我這個請求沒有給你伺服器造不成壓力,那麼你給我處理一下吧.
await _serverSemaphore.WaitAsync();非同步等待進入信號量,如果沒有線程被授予對信號量的訪問許可權,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止

 lock (_totalRequestsLock)
    {
        if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
        {
             return false;
        }
            TotalRequests++;
        }
        //非同步等待進入信號量,如果沒有線程被授予對信號量的訪問許可權,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止
        await _serverSemaphore.WaitAsync();
        return true;
    }

返回成功後那麼中間件這邊再進行處理,_queuePolicy.OnExit();通過該調用進行調用_serverSemaphore.Release();釋放信號燈,再對總請求數遞減

Stack策略

再來看看另一種方法,棧策略,他是怎麼做的呢?一起來看看.再附加上如何使用的代碼.

     public void ConfigureServices(IServiceCollection services)
        {
            services.AddStackPolicy(options =>
            {
                //最大併發請求數
                options.MaxConcurrentRequests = 2;
                //請求隊列長度限制
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }

通過上面的配置,我們便可以對我們的應用程式執行出相應的策略.下麵再來看看他是怎麼實現的呢

  public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
        {
            services.Configure(configure);
            services.AddSingleton<IQueuePolicy, StackPolicy>();
            return services;
        }

可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法

        /// <summary>
        ///     構造方法(初始化參數)
        /// </summary>
        /// <param name="options"></param>
        public StackPolicy(IOptions<QueuePolicyOptions> options)
        {
            //棧分配
            _buffer = new List<ResettableBooleanCompletionSource>();
            //隊列大小
            _maxQueueCapacity = options.Value.RequestQueueLimit;
            //最大併發請求數
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            //剩餘可用空間
            _freeServerSpots = options.Value.MaxConcurrentRequests;
        }

當我們通過中間件請求調用,_queuePolicy.TryEnterAsync()時,首先會判斷我們是否還有訪問請求次數,如果_freeServerSpots>0,那麼則直接給我們返回true,讓中間件直接去執行下一步,如果當前隊列=我們設置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留後面的請求;

    public ValueTask<bool> TryEnterAsync()
        {
            lock (_bufferLock)
            {
                if (_freeServerSpots > 0)
                {
                    _freeServerSpots--;
                    return _trueTask;
                }
                // 如果隊列滿了,取消先前的請求
                if (_queueLength == _maxQueueCapacity)
                {
                    _hasReachedCapacity = true;
                    _buffer[_head].Complete(false);
                    _queueLength--;
                }
                var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
                _cachedResettableTCS = null;
                if (_hasReachedCapacity || _queueLength < _buffer.Count)
                {
                    _buffer[_head] = tcs;
                }
                else
                {
                    _buffer.Add(tcs);
                }
                _queueLength++;
                // increment _head for next time
                _head++;
                if (_head == _maxQueueCapacity)
                {
                    _head = 0;
                }
                return tcs.GetValueTask();
            }
        }

當我們請求後調用_queuePolicy.OnExit();出棧,再將請求長度遞減

    public void OnExit()
        {
            lock (_bufferLock)
            {
                if (_queueLength == 0)
                {
                    _freeServerSpots++;

                    if (_freeServerSpots > _maxConcurrentRequests)
                    {
                        _freeServerSpots--;
                        throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
                    }

                    return;
                }

                // step backwards and launch a new task
                if (_head == 0)
                {
                    _head = _maxQueueCapacity - 1;
                }
                else
                {
                    _head--;
                }
                //退出,出棧
                _buffer[_head].Complete(true);
                _queueLength--;
            }
        }

總結

基於棧結構的特點,在實際應用中,通常只會對棧執行以下兩種操作:

  • 向棧中添加元素,此過程被稱為"進棧"(入棧或壓棧);
  • 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);

隊列存儲結構的實現有以下兩種方式:

  • 順序隊列:在順序表的基礎上實現的隊列結構;
  • 鏈隊列:在鏈表的基礎上實現的隊列結構;

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Spring之IOC容器初始化 前言 在前面我們分析了最底層的IOC容器BeanFactory,接著簡單分析了高級形態的容器ApplicationContext,在ApplicationContext 中我們知道一個核心方法 refresh,這裡面就是IOC容器的初始化流程,在前面並沒有直接去分析它 ...
  • 一.編寫shell腳本基本格式 拿最簡單的 舉例 . !/bin/bash:告訴電腦,使用bash解釋器來執行代碼 echo: 列印 二.運行shell腳本 (推薦使用) 三.註釋 四.定義變數 基本語法 shell 1.定義變數:變數名=變數值 2.撤銷變數:unset 變數名 3.聲明靜態變數 ...
  • 1.paramiko 用於幫助開發者通過代碼遠程連接伺服器,並對伺服器進行操作。 遠程執行命令【用戶名和密碼】 遠程執行命令【公鑰和私鑰】(公鑰必須提前上傳到伺服器) 遠程上傳和下載文件【用戶名和密碼】 遠程上傳和下載文件【公鑰和私鑰】 補充:通過私鑰字元串也可以連接遠程伺服器。 2.公司員工基於x ...
  • 多態是類的三大特性之一,抽象類又是多態的實現方法之一。抽象類是什麼呢,如果把虛方法比作一個盛有純凈水的杯子,那麼此時的“純凈水”就是事先定義好的方法,我們可以根據不同的需求來改變杯子中所事先盛放的是“純凈水”還是“咖啡”。但是抽象類呢,他更像是一個空的杯子,放在消毒櫃中,讓有需要的人自己去拿,去決定 ...
  • 類型的劃分 一個類型,要麼是值類型,要麼是引用類型 。區別在於拷貝方式:值類型拷貝值,引用類型拷貝引用 值類型 值類型直接包含值。相當於每一個值類型都有自己單獨的值: int a = 10; int b = a; a和b都有著自己的值,修改a並不會影響b,反過來一樣,互不影響。 即使是將實例傳給Co ...
  • 每一個擁有資料庫的項目,都會涉及到資料庫數據的操作,而很多時候都會用到相同的方法,但是只是涉及到的表不一樣,如果不對這些類似方法進行封裝,開發上就會造成時間上的浪費。 那麼如何對這些方法進行封裝呢? 要會封裝方法,最基本的得先瞭解 泛型 是什麼,什麼是泛型,博客園上有很多對這個的講解,我也相信,科班 ...
  • 一、.MemoryCache介紹 MemoryCache是.Net Framework 4.0開始提供的記憶體緩存類,使用該類型可以方便的在程式內部緩存數據並對於數據的有效性進行方便的管理, 它通過在記憶體中緩存數據和對象來減少讀取資料庫的次數,從而減輕資料庫負載,加快數據讀取速度,提升系統的性能。 二 ...
  • .Net Core Vue Qucik Start =========================== This is a ASP.NET Core 3.0 project seamlessly integrationed with Vue.js template. A complaint fr ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...