ASP.NET Core中如何限制響應發送速率(不是調用頻率)

来源:https://www.cnblogs.com/coredx/archive/2023/03/08/17195492.html
-Advertisement-
Play Games

前言 ASP.NET Core中有很多RateLimit組件,.NET 7甚至推出了官方版本。不過這些組件的主要目標是限制客戶端訪問服務的頻率,在HTTP伺服器崩潰前主動拒絕部分請求。如果請求沒有被拒絕服務會儘可能調用資源儘快處理。 現在有一個問題,有什麼辦法限制響應的發送速率嗎?這在一些需要長時間 ...


前言

ASP.NET Core中有很多RateLimit組件,.NET 7甚至推出了官方版本。不過這些組件的主要目標是限制客戶端訪問服務的頻率,在HTTP伺服器崩潰前主動拒絕部分請求。如果請求沒有被拒絕服務會儘可能調用資源儘快處理。
現在有一個問題,有什麼辦法限制響應的發送速率嗎?這在一些需要長時間傳輸流式數據的情況時很有用,避免少量請求耗盡網路帶寬,儘可能同時服務更多請求。

Tip

本文節選自我的新書《C#與.NET6 開發從入門到實踐》12.11 流量控制。實現方式偏向知識講解和教學,不保證組件穩定性,不建議直接在產品中使用。有關新書的更多介紹歡迎查看《C#與.NET6 開發從入門到實踐》預售,作者親自來打廣告了!
image

正文

用過百度網盤的人應該都深有體會,如果沒有會員,下載速度會非常慢。實現這種效果的方法有兩種:控制TCP協議的滑動視窗大小;控制響應流的寫入大小和頻率。偏向系統底層的流量控制軟體因為無法干涉軟體中的流,所以一般會直接控制內核TCP協議的滑動視窗大小;而下載軟體等客戶端應用通常直接控制流的寫入和讀取,此時TCP協議的擁塞控制演算法會自動調整滑動視窗大小。這種流量控制對提供大型多媒體資源的應用(例如線上視頻網站)非常重要,能防止一個請求的響應占用太多帶寬影響其他請求的響應發送。
ASP.NET Core並沒有原生提供相關功能,Nuget上也沒有找到相關的程式包(截止截稿)。但其實利用ASP.NET Core提供的介面,是可以實現這個功能的。筆者以ASP.NET Core的響應壓縮中間件為藍本,實現了一個簡單的響應限流中間件。

編寫節流組件

支持限速的基礎流

using System;

namespace AccessControlElementary;

/// <summary>
/// 支持流量控制的流
/// </summary>
public class ThrottlingStream : Stream
{
    /// <summary>
    /// 用於指定每秒可傳輸的無限位元組數的常數。
    /// </summary>
    public const long Infinite = 0;

    #region Private members
    /// <summary>
    /// 基礎流
    /// </summary>
    private readonly Stream _baseStream;

    /// <summary>
    /// 每秒可通過基礎流傳輸的最大位元組數。
    /// </summary>
    private long _maximumBytesPerSecond;

    /// <summary>
    /// 自上次限制以來已傳輸的位元組數。
    /// </summary>
    private long _byteCount;

    /// <summary>
    /// 最後一次限制的開始時間(毫秒)。
    /// </summary>
    private long _start;
    #endregion

    #region Properties

    /// <summary>
    /// 獲取當前毫秒數。
    /// </summary>
    /// <value>當前毫秒數。</value>
    protected long CurrentMilliseconds => Environment.TickCount;

    /// <summary>
    /// 獲取或設置每秒可通過基礎流傳輸的最大位元組數。
    /// </summary>
    /// <value>每秒最大位元組數。</value>
    public long MaximumBytesPerSecond
    {
        get => _maximumBytesPerSecond;
        set
        {
            if (MaximumBytesPerSecond != value)
            {
                _maximumBytesPerSecond = value;
                Reset();
            }
        }
    }

    /// <summary>
    /// 獲取一個值,該值指示當前流是否支持讀取。
    /// </summary>
    /// <returns>如果流支持讀取,則為true;否則為false。</returns>
    public override bool CanRead => _baseStream.CanRead;

    /// <summary>
    /// 獲取估算的流當前的比特率(單位:bps)。
    /// </summary>
    public long CurrentBitsPerSecond { get; protected set; }

    /// <summary>
    /// 獲取一個值,該值指示當前流是否支持定位。
    /// </summary>
    /// <value></value>
    /// <returns>如果流支持定位,則為true;否則為false。</returns>
    public override bool CanSeek => _baseStream.CanSeek;

    /// <summary>
    /// 獲取一個值,該值指示當前流是否支持寫入。
    /// </summary>
    /// <value></value>
    /// <returns>如果流支持寫入,則為true;否則為false。</returns>
    public override bool CanWrite => _baseStream.CanWrite;

    /// <summary>
    /// 獲取流的長度(以位元組為單位)。
    /// </summary>
    /// <value></value>
    /// <returns>一個long值,表示流的長度(位元組)。</returns>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持定位。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    public override long Length => _baseStream.Length;

    /// <summary>
    /// 獲取或設置當前流中的位置。
    /// </summary>
    /// <value></value>
    /// <returns>流中的當前位置。</returns>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持定位。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    public override long Position
    {
        get => _baseStream.Position;
        set => _baseStream.Position = value;
    }
    #endregion

    #region Ctor

    /// <summary>
    /// 使用每秒可傳輸無限位元組數的常數初始化 <see cref="T:ThrottlingStream"/> 類的新實例。
    /// </summary>
    /// <param name="baseStream">基礎流。</param>
    public ThrottlingStream(Stream baseStream)
        : this(baseStream, Infinite) { }

    /// <summary>
    /// 初始化 <see cref="T:ThrottlingStream"/> 類的新實例。
    /// </summary>
    /// <param name="baseStream">基礎流。</param>
    /// <param name="maximumBytesPerSecond">每秒可通過基礎流傳輸的最大位元組數。</param>
    /// <exception cref="ArgumentNullException">當 <see cref="baseStream"/> 是null引用時拋出。</exception>
    /// <exception cref="ArgumentOutOfRangeException">當 <see cref="maximumBytesPerSecond"/> 是負數時拋出.</exception>
    public ThrottlingStream(Stream baseStream, long maximumBytesPerSecond)
    {
        if (maximumBytesPerSecond < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maximumBytesPerSecond),
                maximumBytesPerSecond, "The maximum number of bytes per second can't be negatie.");
        }

        _baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
        _maximumBytesPerSecond = maximumBytesPerSecond;
        _start = CurrentMilliseconds;
        _byteCount = 0;
    }
    #endregion

    #region Public methods

    /// <summary>
    /// 清除此流的所有緩衝區,並將所有緩衝數據寫入基礎設備。
    /// </summary>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    public override void Flush() => _baseStream.Flush();

    /// <summary>
    /// 清除此流的所有緩衝區,並將所有緩衝數據寫入基礎設備。
    /// </summary>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    public override Task FlushAsync(CancellationToken cancellationToken) => _baseStream.FlushAsync(cancellationToken);

    /// <summary>
    /// 從當前流中讀取位元組序列,並將流中的位置前進讀取的位元組數。
    /// </summary>
    /// <param name="buffer">位元組數組。當此方法返回時,緩衝區包含指定的位元組數組,其值介於offset和(offset+count-1)之間,由從當前源讀取的位元組替換。</param>
    /// <param name="offset">緩衝區中從零開始的位元組偏移量,開始存儲從當前流中讀取的數據。</param>
    /// <param name="count">從當前流中讀取的最大位元組數。</param>
    /// <returns>
    /// 讀入緩衝區的位元組總數。如果許多位元組當前不可用,則該值可以小於請求的位元組數;如果已到達流的結尾,則該值可以小於零(0)。
    /// </returns>
    /// <exception cref="T:System.ArgumentException">偏移量和計數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持讀取。 </exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或讀取的最大位元組數為負。</exception>
    public override int Read(byte[] buffer, int offset, int count)
    {
        Throttle(count);

        return _baseStream.Read(buffer, offset, count);
    }

    /// <summary>
    /// 從當前流中讀取位元組序列,並將流中的位置前進讀取的位元組數。
    /// </summary>
    /// <param name="buffer">位元組數組。當此方法返回時,緩衝區包含指定的位元組數組,其值介於offset和(offset+count-1)之間,由從當前源讀取的位元組替換。</param>
    /// <param name="offset">緩衝區中從零開始的位元組偏移量,開始存儲從當前流中讀取的數據。</param>
    /// <param name="count">從當前流中讀取的最大位元組數。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <returns>
    /// 讀入緩衝區的位元組總數。如果許多位元組當前不可用,則該值可以小於請求的位元組數;如果已到達流的結尾,則該值可以小於零(0)。
    /// </returns>
    /// <exception cref="T:System.ArgumentException">偏移量和計數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持讀取。 </exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或讀取的最大位元組數為負。</exception>
    public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        return await ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    /// <summary>
    /// 從當前流中讀取位元組序列,並將流中的位置前進讀取的位元組數。
    /// </summary>
    /// <param name="buffer">記憶體緩衝區。當此方法返回時,緩衝區包含讀取的數據。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <returns>
    /// 讀入緩衝區的位元組總數。如果許多位元組當前不可用,則該值可以小於請求的位元組數;如果已到達流的結尾,則該值可以小於零(0)。
    /// </returns>
    /// <exception cref="T:System.ArgumentException">偏移量和計數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持讀取。 </exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或讀取的最大位元組數為負。</exception>
    public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        await ThrottleAsync(buffer.Length, cancellationToken);
        return await _baseStream.ReadAsync(buffer, cancellationToken);
    }

    /// <summary>
    /// 設置當前流中的位置。
    /// </summary>
    /// <param name="offset">相對於參考點的位元組偏移量。</param>
    /// <param name="origin">類型為<see cref="T:System.IO.SeekOrigin"/>的值,指示用於獲取新位置的參考點。</param>
    /// <returns>
    /// 當前流中的新位置。
    /// </returns>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持定位,例如流是從管道或控制台輸出構造的。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    public override long Seek(long offset, SeekOrigin origin)
    {
        return _baseStream.Seek(offset, origin);
    }

    /// <summary>
    /// 設置當前流的長度。
    /// </summary>
    /// <param name="value">當前流的所需長度(位元組)。</param>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持寫入和定位,例如流是從管道或控制台輸出構造的。</exception>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    public override void SetLength(long value)
    {
        _baseStream.SetLength(value);
    }

    /// <summary>
    /// 將位元組序列寫入當前流,並按寫入的位元組數前進此流中的當前位置。
    /// </summary>
    /// <param name="buffer">位元組數組。此方法將要寫入當前流的位元組從緩衝區複製到當前流。</param>
    /// <param name="offset">緩衝區中從零開始向當前流複製位元組的位元組偏移量。</param>
    /// <param name="count">要寫入當前流的位元組數。</param>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持寫入。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.ArgumentException">偏移量和寫入位元組數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或寫入位元組數為負。</exception>
    public override void Write(byte[] buffer, int offset, int count)
    {
        Throttle(count);
        _baseStream.Write(buffer, offset, count);
    }

    /// <summary>
    /// 將位元組序列寫入當前流,並按寫入的位元組數前進此流中的當前位置。
    /// </summary>
    /// <param name="buffer">位元組數組。此方法將要寫入當前流的位元組從緩衝區複製到當前流。</param>
    /// <param name="offset">緩衝區中從零開始向當前流複製位元組的位元組偏移量。</param>
    /// <param name="count">要寫入當前流的位元組數。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持寫入。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.ArgumentException">偏移量和寫入位元組數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或寫入位元組數為負。</exception>
    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    /// <summary>
    /// 將記憶體緩衝區寫入當前流,並按寫入的位元組數前進此流中的當前位置。
    /// </summary>
    /// <param name="buffer">記憶體緩衝區。此方法將要寫入當前流的位元組從緩衝區複製到當前流。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持寫入。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.ArgumentException">偏移量和寫入位元組數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或寫入位元組數為負。</exception>
    public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
    {
        await ThrottleAsync(buffer.Length, cancellationToken);
        await _baseStream.WriteAsync(buffer, cancellationToken);
    }

    /// <summary>
    /// 返回一個表示當前<see cref="T:System.Object" />的<see cref="T:System.String" />。
    /// </summary>
    /// <returns>
    /// 表示當前<see cref="T:System.Object" />的<see cref="T:System.String" />。
    /// </returns>
    public override string ToString()
    {
        return _baseStream.ToString()!;
    }
    #endregion

    #region Protected methods

    /// <summary>
    /// 如果比特率大於最大比特率,嘗試限流
    /// </summary>
    /// <param name="bufferSizeInBytes">緩衝區大小(位元組)。</param>
    protected void Throttle(int bufferSizeInBytes)
    {
        var toSleep = CaculateThrottlingMilliseconds(bufferSizeInBytes);
        if (toSleep > 1)
        {
            try
            {
                Thread.Sleep(toSleep);
            }
            catch (ThreadAbortException)
            {
                // 忽略ThreadAbortException。
            }

            // 睡眠已經完成,重置限流
            Reset();
        }
    }

    /// <summary>
    /// 如果比特率大於最大比特率,嘗試限流。
    /// </summary>
    /// <param name="bufferSizeInBytes">緩衝區大小(位元組)。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    protected async Task ThrottleAsync(int bufferSizeInBytes, CancellationToken cancellationToken)
    {
        var toSleep = CaculateThrottlingMilliseconds(bufferSizeInBytes);
        if (toSleep > 1)
        {
            try
            {
                await Task.Delay(toSleep, cancellationToken);
            }
            catch (TaskCanceledException)
            {
                // 忽略TaskCanceledException。
            }

            // 延遲已經完成,重置限流。
            Reset();
        }
    }

    /// <summary>
    /// 計算在操作流之前應當延遲的時間(單位:毫秒)。
    /// 更新流當前的比特率。
    /// </summary>
    /// <param name="bufferSizeInBytes">緩衝區大小(位元組)。</param>
    /// <returns>應當延遲的時間(毫秒)。</returns>
    protected int CaculateThrottlingMilliseconds(int bufferSizeInBytes)
    {
        int toSleep = 0;

        // 確保緩衝區不為null
        if (bufferSizeInBytes <= 0)
        {
            CurrentBitsPerSecond = 0;
        }
        else
        {
            _byteCount += bufferSizeInBytes;
            long elapsedMilliseconds = CurrentMilliseconds - _start;

            if (elapsedMilliseconds > 0)
            {
                // 計算當前瞬時比特率
                var bp = _byteCount * 1000L;
                var bps = bp / elapsedMilliseconds;
                var avgBps = bps;

                //如果bps大於最大bps,返回應當延遲的時間。
                if (_maximumBytesPerSecond > 0 && bps > _maximumBytesPerSecond)
                {
                    // 計算延遲時間
                    long wakeElapsed = bp / _maximumBytesPerSecond;
                    var result = (int)(wakeElapsed - elapsedMilliseconds);
                    // 計算平均比特率
                    var div = result / 1000.0;
                    avgBps = (long)(bps / (div == 0 ? 1 : div));

                    if (result > 1)
                    {
                        toSleep = result; ;
                    }
                }
                // 更新當前(平均)比特率
                CurrentBitsPerSecond = (long)(avgBps / 8);
            }
        }

        return toSleep;
    }

    /// <summary>
    /// 將位元組數重置為0,並將開始時間重置為當前時間。
    /// </summary>
    protected void Reset()
    {
        long difference = CurrentMilliseconds - _start;

        // 只有在已知歷史記錄可用時間超過1秒時才重置計數器。
        if (difference > 1000)
        {
            _byteCount = 0;
            _start = CurrentMilliseconds;
        }
    }

    #endregion
}

CaculateThrottleMilliseconds 、Throttle和ThrottleAsync是這個流的核心。CaculateThrottleMilliseconds方法負責計算在寫入或讀取流之前應該延遲多久和更新流當前的傳輸速率,Throttle和ThrottleAsync方法負責同步和非同步延遲。

限流響應正文

using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
using System.IO.Pipelines;
using System;

namespace AccessControlElementary;

// 自定義的HTTP功能介面,提供獲取限流速率設置和當前速率的獲取能力
public interface IHttpResponseThrottlingFeature
{
    public long? MaximumBytesPerSecond { get; }
    public long? CurrentBitsPerSecond { get; }
}

// 限流響應正文的實現類,實現了自定義的功能介面
public class ThrottlingResponseBody : Stream, IHttpResponseBodyFeature, IHttpResponseThrottlingFeature
{
    private readonly IHttpResponseBodyFeature _innerBodyFeature;
    private readonly IOptionsSnapshot<ResponseThrottlingOptions> _options;
    private readonly HttpContext _httpContext;
    private readonly Stream _innerStream;

    private ThrottlingStream? _throttlingStream;
    private PipeWriter? _pipeAdapter;
    private bool _throttlingChecked;
    private bool _complete;
    private int _throttlingRefreshCycleCount;

    public ThrottlingResponseBody(IHttpResponseBodyFeature innerBodyFeature, HttpContext httpContext, IOptionsSnapshot<ResponseThrottlingOptions> options)
    {
        _options = options ?? throw new ArgumentNullException(nameof(options));
        _httpContext = httpContext ?? throw new ArgumentNullException(nameof(httpContext));
        _innerBodyFeature = innerBodyFeature ?? throw new ArgumentNullException(nameof(innerBodyFeature));
        _innerStream = innerBodyFeature.Stream;
        _throttlingRefreshCycleCount = 0;
    }

    public override bool CanRead => false;

    public override bool CanSeek => false;

    public override bool CanWrite => _innerStream.CanWrite;

    public override long Length => _innerStream.Length;

    public override long Position
    {
        get => throw new NotSupportedException();
        set => throw new NotSupportedException();
    }

    public Stream Stream => this;

    public PipeWriter Writer
    {
        get
        {
            if (_pipeAdapter == null)
            {
                _pipeAdapter = PipeWriter.Create(Stream, new StreamPipeWriterOptions(leaveOpen: true));
                if (_complete)
                {
                    _pipeAdapter.Complete();
                }
            }

            return _pipeAdapter;
        }
    }

    public long? MaximumBytesPerSecond => _throttlingStream?.MaximumBytesPerSecond;

    public long? CurrentBitsPerSecond => _throttlingStream?.CurrentBitsPerSecond;

    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();

    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

    public override void SetLength(long value) => throw new NotSupportedException();

    public override void Write(byte[] buffer, int offset, int count)
    {
        OnWriteAsync().ConfigureAwait(false).GetAwaiter().GetResult();

        if (_throttlingStream != null)
        {
            _throttlingStream.Write(buffer, offset, count);
            _throttlingStream.Flush();
        }
        else
        {
            _innerStream.Write(buffer, offset, count);
        }
    }

    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
    {
        await OnWriteAsync();

        if (_throttlingStream != null)
        {
            await _throttlingStream.WriteAsync(buffer, cancellationToken);
            await _throttlingStream.FlushAsync(cancellationToken);
        }
        else
        {
            await _innerStream.WriteAsync(buffer, cancellationToken);
        }
    }

    public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
    {
        var tcs = new TaskCompletionSource(state: state, TaskCreationOptions.RunContinuationsAsynchronously);
        InternalWriteAsync(buffer, offset, count, callback, tcs);
        return tcs.Task;
    }

    private async void InternalWriteAsync(byte[] buffer, int offset, int count, AsyncCallback? callback, TaskCompletionSource tcs)
    {
        try
        {
            await WriteAsync(buffer.AsMemory(offset, count));
            tcs.TrySetResult();
        }
        catch (Exception ex)
        {
            tcs.TrySetException(ex);
        }

        if (callback != null)
        {
            // Offload callbacks to avoid stack dives on sync completions.
            var ignored = Task.Run(() =>
            {
                try
                {
                    callback(tcs.Task);
                }
                catch (Exception)
                {
                    // Suppress exceptions on background threads.
                }
            });
        }
    }

    public override void EndWrite(IAsyncResult asyncResult)
    {
        if (asyncResult == null)
        {
            throw new ArgumentNullException(nameof(asyncResult));
        }

        var task = (Task)asyncResult;
        task.GetAwaiter().GetResult();
    }

    public async Task CompleteAsync()
    {
        if (_complete)
        {
            return;
        }

        await FinishThrottlingAsync(); // Sets _complete
        await _innerBodyFeature.CompleteAsync();
    }

    public void DisableBuffering()
    {
        _innerBodyFeature?.DisableBuffering();
    }

    public override void Flush()
    {
        if (!_throttlingChecked)
        {
            OnWriteAsync().ConfigureAwait(false).GetAwaiter().GetResult();
            // Flush the original stream to send the headers. Flushing the compression stream won't
            // flush the original stream if no data has been written yet.
            _innerStream.Flush();
            return;
        }

        if (_throttlingStream != null)
        {
            _throttlingStream.Flush();
        }
        else
        {
            _innerStream.Flush();
        }
    }

    public override async Task FlushAsync(CancellationToken cancellationToken)
    {
        if (!_throttlingChecked)
        {
            await OnWriteAsync();
            // Flush the original stream to send the headers. Flushing the compression stream won't
            // flush the original stream if no data has been written yet.
            await _innerStream.FlushAsync(cancellationToken);
            return;
        }

        if (_throttlingStream != null)
        {
            await _throttlingStream.FlushAsync(cancellationToken);
            return;
        }

        await _innerStream.FlushAsync(cancellationToken);
    }

    public async Task SendFileAsync(string path, long offset, long? count, CancellationToken cancellationToken)
    {
        await OnWriteAsync();

        if (_throttlingStream != null)
        {
            await SendFileFallback.SendFileAsync(Stream, path, offset, count, cancellationToken);
            return;
        }

        await _innerBodyFeature.SendFileAsync(path, offset, count, cancellationToken);
    }

    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await OnWriteAsync();
        await _innerBodyFeature.StartAsync(cancellationToken);
    }

    internal async Task FinishThrottlingAsync()
    {
        if (_complete)
        {
            return;
        }

        _complete = true;

        if (_pipeAdapter != null)
        {
            await _pipeAdapter.CompleteAsync();
        }

        if (_throttlingStream != null)
        {
            await _throttlingStream.DisposeAsync();
        }
    }

    private async Task OnWriteAsync()
    {
        if (!_throttlingChecked)
        {
            _throttlingChecked = true;
            var maxValue = await _options.Value.ThrottlingProvider.Invoke(_httpContext);
            _throttlingStream = new ThrottlingStream(_innerStream, maxValue < 0 ? 0 : maxValue);
        }

        if (_throttlingStream != null && _options?.Value?.ThrottlingRefreshCycle > 0)
        {
            if (_throttlingRefreshCycleCount >= _options.Value.ThrottlingRefreshCycle)
            {
                _throttlingRefreshCycleCount = 0;

                var maxValue = await _options.Value.ThrottlingProvider.Invoke(_httpContext);
                _throttlingStream.MaximumBytesPerSecond = maxValue < 0 ? 0 : maxValue;
            }
            else
            {
                _throttlingRefreshCycleCount++;
            }
        }
    }
}

自定義的響應正文類必須實現IHttpResponseBodyFeature介面才能作為應用的底層響應流使用,設計和實現參考ASP.NET Core的ResponseCompressionBody。

響應限流中間件

using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
using Timer = System.Timers.Timer;

namespace AccessControlElementary;

public class ResponseThrottlingMiddleware
{
    private readonly RequestDelegate _next;

    public ResponseThrottlingMiddleware(RequestDelegate next)
    {
        _next = next;
    }

    public async Task Invoke(HttpContext context, IOptionsSnapshot<ResponseThrottlingOptions> options, ILogger<ResponseThrottlingMiddleware> logger)
    {
        ThrottlingResponseBody throttlingBody = null;
        IHttpResponseBodyFeature originalBodyFeature = null;

        var shouldThrottling = await options?.Value?.ShouldThrottling?.Invoke(context);
        if (shouldThrottling == true)
        {
            //獲取原始輸出Body
            originalBodyFeature = context.Features.Get<IHttpResponseBodyFeature>();
            //初始化限流Body
            throttlingBody = new ThrottlingResponseBody(originalBodyFeature, context, options);
            //設置成限流Body
            context.Features.Set<IHttpResponseBodyFeature>(throttlingBody);
            context.Features.Set<IHttpResponseThrottlingFeature>(throttlingBody);
            // 用定時器定期向外彙報信息,這可能導致性能下降,僅用於演示目的
            var timer = new Timer(1000);
            timer.AutoReset = true;
            long? currentBitsPerSecond = null;
            var traceIdentifier = context.TraceIdentifier;

            timer.Elapsed += (sender, arg) =>
            {
                if (throttlingBody.CurrentBitsPerSecond != currentBitsPerSecond)
                {
                    currentBitsPerSecond = throttlingBody.CurrentBitsPerSecond;

                    var bps = (double)(throttlingBody.CurrentBitsPerSecond ?? 0);
                    var (unitBps, unit) = bps switch
                    {
                        < 1000 => (bps, "bps"),
                        < 1000_000 => (bps / 1000, "kbps"),
                        _ => (bps / 1000_000, "mbps"),
                    };

                    logger.LogDebug("請求:{RequestTraceIdentifier} 當前響應發送速率:{CurrentBitsPerSecond} {Unit}。", traceIdentifier, unitBps, unit);
                }
            };

            // 開始發送響應後啟動定時器
            context.Response.OnStarting(async () =>
            {
                logger.LogInformation("請求:{RequestTraceIdentifier} 開始發送響應。", traceIdentifier);
                timer.Start();
            });

            // 響應發送完成後銷毀定時器
            context.Response.OnCompleted(async () =>
            {
                logger.LogInformation("請求:{RequestTraceIdentifier} 響應發送完成。", traceIdentifier);
                timer.Stop();
                timer?.Dispose();
            });

            // 請求取消後銷毀定時器
            context.RequestAborted.Register(() =>
            {
                logger.LogInformation("請求:{RequestTraceIdentifier} 已中止。", traceIdentifier);
                timer.Stop();
                timer?.Dispose();
            });
        }

        try
        {
            await _next(context);
            if (shouldThrottling == true)
            {
                // 刷新響應流,確保所有數據都發送到網卡
                await throttlingBody.FinishThrottlingAsync();
            }
        }
        finally
        {
            if (shouldThrottling == true)
            {
                //限流發生錯誤,恢複原始Body
                context.Features.Set(originalBodyFeature);
            }
        }
    }
}

中間件負責把基礎響應流替換為限流響應流,併為每個請求重新讀取選項,使每個請求都能夠獨立控制限流的速率,然後在響應發送啟動後記錄響應的發送速率。

響應限流選項

namespace AccessControlElementary;

public class ResponseThrottlingOptions
{
    /// <summary>
    /// 獲取或設置流量限制的值的刷新周期,刷新時會重新調用<see cref="ThrottlingProvider"/>設置限制值。
    /// 值越大刷新間隔越久,0或負數表示永不刷新。
    /// </summary>
    public int ThrottlingRefreshCycle { get; set; }

    /// <summary>
    /// 獲取或設置指示是否應該啟用流量控制的委托
    /// </summary>
    public Func<HttpContext, Task<bool>> ShouldThrottling { get; set; }

    /// <summary>
    /// 獲取或設置指示流量限制大小的委托(單位:Byte/s)
    /// </summary>
    public Func<HttpContext, Task<int>> ThrottlingProvider { get; set; }
}

響應限流服務註冊和中間件配置擴展

namespace AccessControlElementary;

// 配置中間件用的輔助類和擴展方法
public static class ResponseThrottlingMiddlewareExtensions
{
    public static IApplicationBuilder UseResponseThrottling(this IApplicationBuilder app)
    {
        return app.UseMiddleware<ResponseThrottlingMiddleware>();
    }
}

// 註冊中間件需要的服務的輔助類和擴展方法
public static class ResponseThrottlingServicesExtensions
{
    public static IServiceCollection AddResponseThrottling(this IServiceCollection services, Action<ResponseThrottlingOptions> configureOptions = null)
    {
        services.Configure(configureOptions);
        return services;
    }
}

使用節流組件

服務註冊和請求管道配置

Startup啟動配置

namespace AccessControlElementary;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        // 註冊限流服務和選項
        services.AddResponseThrottling(options =>
        {
            options.ThrottlingRefreshCycle = 100;
            options.ShouldThrottling = static async _ => true;
            options.ThrottlingProvider = static async _ => 100 * 1024; // 100KB/s
        });

        services.AddRazorPages();
    }

    public void Configure(IApplicationBuilder app)
    {
        // 配置響應限流中間件
        app.UseResponseThrottling();

        app.UseStaticFiles();

        app.UseRouting();

        app.UseAuthentication();
        app.UseAuthorization();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapRazorPages();
        });
    }
}

示例展示瞭如何配置和啟用響應限流。ThrottlingRefreshCycle設置為每100次響應流寫入周期刷新一次流量限制的值,使限流值能在響應發送中動態調整;ShouldThrottling設置為無條件啟用限流;ThrottlingProvider設置為限速100 KB/s。
請求只有在UseResponseThrottling之前配置的短路中間件處被處理時不會受影響,請求沒有被短路的話,只要經過限流中間件,基礎響應流就被替換了。如果同時使用了響應壓縮,會變成限流響應包裹壓縮響應(或者相反),壓縮響應(或者限流響應)又包裹基礎響應的嵌套結構。

結語

本書在介紹.NET 6基礎知識時會儘可能使用具有現實意義的示例避免學習和實踐脫節,本文就是其中之一,如果本文對您有價值,歡迎繼續瞭解和購買本書。《C#與.NET6 開發從入門到實踐》預售,作者親自來打廣告了!

本文地址:https://www.cnblogs.com/coredx/p/17195492.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 說起開源CMS,你會想到哪些呢?WordPress?DoraCMS?joomla? 今天再給大家推薦一個非常好用的開源CMS:Wagtail 如果您正在選型的話,可以瞭解一下Wagtail的特點: 基於Django構建,具有出色的文檔管理功能和友好的用戶界面。 提供了一個靈活且易於使用的頁面編輯器, ...
  • 學習網頁設計和網路編程可能是一種有趣而有意義的體驗,但需要時間,精力和練習.這裡有一些技巧可以幫助您更輕鬆地學習這些技能: 從基礎知識開始:在您深入研究高級主題之前,重要的是要有牢固的理解很重要基礎知識.首先學習HTML,CSS和JavaScript,這是網路的基礎語言. 使用線上資源:線上資源有許 ...
  • 概述 鎖是電腦協調多個進程或線程併發訪問某一資源的機制。在資料庫中,除傳統的計算資源(CPU、RAM、I/O)的爭用以外,數據也是一種供許多用戶共用的資源。如何保證數據併發訪問的一致性、有效性是所有資料庫必須解決的一個問題,鎖衝突也是影響資料庫併發訪問性能的一個重要因素。從這個角度來說,鎖對資料庫 ...
  • Java編程語言是由Sun微系統公司在20世紀90年代早期開發的。儘管Java主要用於基於internet的應用程式,但它是一種簡單、高效、通用的語言。Java最初是為運行在多個平臺上的嵌入式網路應用程式而設計的。它是一種可移植的、面向對象的解釋性語言。 Java是非常可移植的。相同的Java應用程 ...
  • VB.NET語言線上運行編譯,是一款可線上編程編輯器,在編輯器上輸入VB.NET語言代碼,點擊運行,可線上編譯運行VB.NET語言,VB.NET語言代碼線上運行調試,VB.NET語言線上編譯,可快速線上測試您的VB.NET語言代碼,線上編譯VB.NET語言代碼發現是否存在錯誤,如果代碼測試通過,將會 ...
  • 控制語句:程式預設是順序執行,但在實際項目中需要選擇、迴圈。 1 選擇控制語句if 1.1 if語句的形式 1 if(條件表達式) 2 {//複合語句,若幹條語句的集合 3 語句一; 4 語句二; 5 } 註意:如果條件成立執行大括弧里的所有語句,不成立的話大括弧里的語句都不執行。 if(條件表達式 ...
  • 導出word,以下為導出單個和zip的兩種格式。 CountDownLatch運用 CountDownLatch和ExecutorService 線程池cachedThreadPool.submit 1、CountDownLatch 概念 CountDownLatch可以使一個獲多個線程等待其他線程 ...
  • 如果熟悉 GIthub 我們經常可以在一些開源項目的 PR 上看到會配置測試的驗證以及覆蓋率的報告,並且可以強制覆蓋率不低於設定的值才可以進行 Merge PR。 1.測試 創建一個 xUnit 單元測試項目。 Class /// <summary> /// Represents a class w ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...