ASP.NET Core中如何限制響應發送速率(不是調用頻率)

来源:https://www.cnblogs.com/coredx/archive/2023/03/08/17195492.html
-Advertisement-
Play Games

前言 ASP.NET Core中有很多RateLimit組件,.NET 7甚至推出了官方版本。不過這些組件的主要目標是限制客戶端訪問服務的頻率,在HTTP伺服器崩潰前主動拒絕部分請求。如果請求沒有被拒絕服務會儘可能調用資源儘快處理。 現在有一個問題,有什麼辦法限制響應的發送速率嗎?這在一些需要長時間 ...


前言

ASP.NET Core中有很多RateLimit組件,.NET 7甚至推出了官方版本。不過這些組件的主要目標是限制客戶端訪問服務的頻率,在HTTP伺服器崩潰前主動拒絕部分請求。如果請求沒有被拒絕服務會儘可能調用資源儘快處理。
現在有一個問題,有什麼辦法限制響應的發送速率嗎?這在一些需要長時間傳輸流式數據的情況時很有用,避免少量請求耗盡網路帶寬,儘可能同時服務更多請求。

Tip

本文節選自我的新書《C#與.NET6 開發從入門到實踐》12.11 流量控制。實現方式偏向知識講解和教學,不保證組件穩定性,不建議直接在產品中使用。有關新書的更多介紹歡迎查看《C#與.NET6 開發從入門到實踐》預售,作者親自來打廣告了!
image

正文

用過百度網盤的人應該都深有體會,如果沒有會員,下載速度會非常慢。實現這種效果的方法有兩種:控制TCP協議的滑動視窗大小;控制響應流的寫入大小和頻率。偏向系統底層的流量控制軟體因為無法干涉軟體中的流,所以一般會直接控制內核TCP協議的滑動視窗大小;而下載軟體等客戶端應用通常直接控制流的寫入和讀取,此時TCP協議的擁塞控制演算法會自動調整滑動視窗大小。這種流量控制對提供大型多媒體資源的應用(例如線上視頻網站)非常重要,能防止一個請求的響應占用太多帶寬影響其他請求的響應發送。
ASP.NET Core並沒有原生提供相關功能,Nuget上也沒有找到相關的程式包(截止截稿)。但其實利用ASP.NET Core提供的介面,是可以實現這個功能的。筆者以ASP.NET Core的響應壓縮中間件為藍本,實現了一個簡單的響應限流中間件。

編寫節流組件

支持限速的基礎流

using System;

namespace AccessControlElementary;

/// <summary>
/// 支持流量控制的流
/// </summary>
public class ThrottlingStream : Stream
{
    /// <summary>
    /// 用於指定每秒可傳輸的無限位元組數的常數。
    /// </summary>
    public const long Infinite = 0;

    #region Private members
    /// <summary>
    /// 基礎流
    /// </summary>
    private readonly Stream _baseStream;

    /// <summary>
    /// 每秒可通過基礎流傳輸的最大位元組數。
    /// </summary>
    private long _maximumBytesPerSecond;

    /// <summary>
    /// 自上次限制以來已傳輸的位元組數。
    /// </summary>
    private long _byteCount;

    /// <summary>
    /// 最後一次限制的開始時間(毫秒)。
    /// </summary>
    private long _start;
    #endregion

    #region Properties

    /// <summary>
    /// 獲取當前毫秒數。
    /// </summary>
    /// <value>當前毫秒數。</value>
    protected long CurrentMilliseconds => Environment.TickCount;

    /// <summary>
    /// 獲取或設置每秒可通過基礎流傳輸的最大位元組數。
    /// </summary>
    /// <value>每秒最大位元組數。</value>
    public long MaximumBytesPerSecond
    {
        get => _maximumBytesPerSecond;
        set
        {
            if (MaximumBytesPerSecond != value)
            {
                _maximumBytesPerSecond = value;
                Reset();
            }
        }
    }

    /// <summary>
    /// 獲取一個值,該值指示當前流是否支持讀取。
    /// </summary>
    /// <returns>如果流支持讀取,則為true;否則為false。</returns>
    public override bool CanRead => _baseStream.CanRead;

    /// <summary>
    /// 獲取估算的流當前的比特率(單位:bps)。
    /// </summary>
    public long CurrentBitsPerSecond { get; protected set; }

    /// <summary>
    /// 獲取一個值,該值指示當前流是否支持定位。
    /// </summary>
    /// <value></value>
    /// <returns>如果流支持定位,則為true;否則為false。</returns>
    public override bool CanSeek => _baseStream.CanSeek;

    /// <summary>
    /// 獲取一個值,該值指示當前流是否支持寫入。
    /// </summary>
    /// <value></value>
    /// <returns>如果流支持寫入,則為true;否則為false。</returns>
    public override bool CanWrite => _baseStream.CanWrite;

    /// <summary>
    /// 獲取流的長度(以位元組為單位)。
    /// </summary>
    /// <value></value>
    /// <returns>一個long值,表示流的長度(位元組)。</returns>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持定位。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    public override long Length => _baseStream.Length;

    /// <summary>
    /// 獲取或設置當前流中的位置。
    /// </summary>
    /// <value></value>
    /// <returns>流中的當前位置。</returns>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持定位。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    public override long Position
    {
        get => _baseStream.Position;
        set => _baseStream.Position = value;
    }
    #endregion

    #region Ctor

    /// <summary>
    /// 使用每秒可傳輸無限位元組數的常數初始化 <see cref="T:ThrottlingStream"/> 類的新實例。
    /// </summary>
    /// <param name="baseStream">基礎流。</param>
    public ThrottlingStream(Stream baseStream)
        : this(baseStream, Infinite) { }

    /// <summary>
    /// 初始化 <see cref="T:ThrottlingStream"/> 類的新實例。
    /// </summary>
    /// <param name="baseStream">基礎流。</param>
    /// <param name="maximumBytesPerSecond">每秒可通過基礎流傳輸的最大位元組數。</param>
    /// <exception cref="ArgumentNullException">當 <see cref="baseStream"/> 是null引用時拋出。</exception>
    /// <exception cref="ArgumentOutOfRangeException">當 <see cref="maximumBytesPerSecond"/> 是負數時拋出.</exception>
    public ThrottlingStream(Stream baseStream, long maximumBytesPerSecond)
    {
        if (maximumBytesPerSecond < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maximumBytesPerSecond),
                maximumBytesPerSecond, "The maximum number of bytes per second can't be negatie.");
        }

        _baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
        _maximumBytesPerSecond = maximumBytesPerSecond;
        _start = CurrentMilliseconds;
        _byteCount = 0;
    }
    #endregion

    #region Public methods

    /// <summary>
    /// 清除此流的所有緩衝區,並將所有緩衝數據寫入基礎設備。
    /// </summary>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    public override void Flush() => _baseStream.Flush();

    /// <summary>
    /// 清除此流的所有緩衝區,並將所有緩衝數據寫入基礎設備。
    /// </summary>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    public override Task FlushAsync(CancellationToken cancellationToken) => _baseStream.FlushAsync(cancellationToken);

    /// <summary>
    /// 從當前流中讀取位元組序列,並將流中的位置前進讀取的位元組數。
    /// </summary>
    /// <param name="buffer">位元組數組。當此方法返回時,緩衝區包含指定的位元組數組,其值介於offset和(offset+count-1)之間,由從當前源讀取的位元組替換。</param>
    /// <param name="offset">緩衝區中從零開始的位元組偏移量,開始存儲從當前流中讀取的數據。</param>
    /// <param name="count">從當前流中讀取的最大位元組數。</param>
    /// <returns>
    /// 讀入緩衝區的位元組總數。如果許多位元組當前不可用,則該值可以小於請求的位元組數;如果已到達流的結尾,則該值可以小於零(0)。
    /// </returns>
    /// <exception cref="T:System.ArgumentException">偏移量和計數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持讀取。 </exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或讀取的最大位元組數為負。</exception>
    public override int Read(byte[] buffer, int offset, int count)
    {
        Throttle(count);

        return _baseStream.Read(buffer, offset, count);
    }

    /// <summary>
    /// 從當前流中讀取位元組序列,並將流中的位置前進讀取的位元組數。
    /// </summary>
    /// <param name="buffer">位元組數組。當此方法返回時,緩衝區包含指定的位元組數組,其值介於offset和(offset+count-1)之間,由從當前源讀取的位元組替換。</param>
    /// <param name="offset">緩衝區中從零開始的位元組偏移量,開始存儲從當前流中讀取的數據。</param>
    /// <param name="count">從當前流中讀取的最大位元組數。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <returns>
    /// 讀入緩衝區的位元組總數。如果許多位元組當前不可用,則該值可以小於請求的位元組數;如果已到達流的結尾,則該值可以小於零(0)。
    /// </returns>
    /// <exception cref="T:System.ArgumentException">偏移量和計數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持讀取。 </exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或讀取的最大位元組數為負。</exception>
    public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        return await ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    /// <summary>
    /// 從當前流中讀取位元組序列,並將流中的位置前進讀取的位元組數。
    /// </summary>
    /// <param name="buffer">記憶體緩衝區。當此方法返回時,緩衝區包含讀取的數據。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <returns>
    /// 讀入緩衝區的位元組總數。如果許多位元組當前不可用,則該值可以小於請求的位元組數;如果已到達流的結尾,則該值可以小於零(0)。
    /// </returns>
    /// <exception cref="T:System.ArgumentException">偏移量和計數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持讀取。 </exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或讀取的最大位元組數為負。</exception>
    public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        await ThrottleAsync(buffer.Length, cancellationToken);
        return await _baseStream.ReadAsync(buffer, cancellationToken);
    }

    /// <summary>
    /// 設置當前流中的位置。
    /// </summary>
    /// <param name="offset">相對於參考點的位元組偏移量。</param>
    /// <param name="origin">類型為<see cref="T:System.IO.SeekOrigin"/>的值,指示用於獲取新位置的參考點。</param>
    /// <returns>
    /// 當前流中的新位置。
    /// </returns>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持定位,例如流是從管道或控制台輸出構造的。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    public override long Seek(long offset, SeekOrigin origin)
    {
        return _baseStream.Seek(offset, origin);
    }

    /// <summary>
    /// 設置當前流的長度。
    /// </summary>
    /// <param name="value">當前流的所需長度(位元組)。</param>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持寫入和定位,例如流是從管道或控制台輸出構造的。</exception>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    public override void SetLength(long value)
    {
        _baseStream.SetLength(value);
    }

    /// <summary>
    /// 將位元組序列寫入當前流,並按寫入的位元組數前進此流中的當前位置。
    /// </summary>
    /// <param name="buffer">位元組數組。此方法將要寫入當前流的位元組從緩衝區複製到當前流。</param>
    /// <param name="offset">緩衝區中從零開始向當前流複製位元組的位元組偏移量。</param>
    /// <param name="count">要寫入當前流的位元組數。</param>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持寫入。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.ArgumentException">偏移量和寫入位元組數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或寫入位元組數為負。</exception>
    public override void Write(byte[] buffer, int offset, int count)
    {
        Throttle(count);
        _baseStream.Write(buffer, offset, count);
    }

    /// <summary>
    /// 將位元組序列寫入當前流,並按寫入的位元組數前進此流中的當前位置。
    /// </summary>
    /// <param name="buffer">位元組數組。此方法將要寫入當前流的位元組從緩衝區複製到當前流。</param>
    /// <param name="offset">緩衝區中從零開始向當前流複製位元組的位元組偏移量。</param>
    /// <param name="count">要寫入當前流的位元組數。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持寫入。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.ArgumentException">偏移量和寫入位元組數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或寫入位元組數為負。</exception>
    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    /// <summary>
    /// 將記憶體緩衝區寫入當前流,並按寫入的位元組數前進此流中的當前位置。
    /// </summary>
    /// <param name="buffer">記憶體緩衝區。此方法將要寫入當前流的位元組從緩衝區複製到當前流。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    /// <exception cref="T:System.IO.IOException">發生I/O錯誤。</exception>
    /// <exception cref="T:System.NotSupportedException">基礎流不支持寫入。</exception>
    /// <exception cref="T:System.ObjectDisposedException">方法在流關閉後被調用。</exception>
    /// <exception cref="T:System.ArgumentNullException">緩衝區為null。</exception>
    /// <exception cref="T:System.ArgumentException">偏移量和寫入位元組數之和大於緩衝區長度。</exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或寫入位元組數為負。</exception>
    public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
    {
        await ThrottleAsync(buffer.Length, cancellationToken);
        await _baseStream.WriteAsync(buffer, cancellationToken);
    }

    /// <summary>
    /// 返回一個表示當前<see cref="T:System.Object" />的<see cref="T:System.String" />。
    /// </summary>
    /// <returns>
    /// 表示當前<see cref="T:System.Object" />的<see cref="T:System.String" />。
    /// </returns>
    public override string ToString()
    {
        return _baseStream.ToString()!;
    }
    #endregion

    #region Protected methods

    /// <summary>
    /// 如果比特率大於最大比特率,嘗試限流
    /// </summary>
    /// <param name="bufferSizeInBytes">緩衝區大小(位元組)。</param>
    protected void Throttle(int bufferSizeInBytes)
    {
        var toSleep = CaculateThrottlingMilliseconds(bufferSizeInBytes);
        if (toSleep > 1)
        {
            try
            {
                Thread.Sleep(toSleep);
            }
            catch (ThreadAbortException)
            {
                // 忽略ThreadAbortException。
            }

            // 睡眠已經完成,重置限流
            Reset();
        }
    }

    /// <summary>
    /// 如果比特率大於最大比特率,嘗試限流。
    /// </summary>
    /// <param name="bufferSizeInBytes">緩衝區大小(位元組)。</param>
    /// <param name="cancellationToken">取消令牌。</param>
    protected async Task ThrottleAsync(int bufferSizeInBytes, CancellationToken cancellationToken)
    {
        var toSleep = CaculateThrottlingMilliseconds(bufferSizeInBytes);
        if (toSleep > 1)
        {
            try
            {
                await Task.Delay(toSleep, cancellationToken);
            }
            catch (TaskCanceledException)
            {
                // 忽略TaskCanceledException。
            }

            // 延遲已經完成,重置限流。
            Reset();
        }
    }

    /// <summary>
    /// 計算在操作流之前應當延遲的時間(單位:毫秒)。
    /// 更新流當前的比特率。
    /// </summary>
    /// <param name="bufferSizeInBytes">緩衝區大小(位元組)。</param>
    /// <returns>應當延遲的時間(毫秒)。</returns>
    protected int CaculateThrottlingMilliseconds(int bufferSizeInBytes)
    {
        int toSleep = 0;

        // 確保緩衝區不為null
        if (bufferSizeInBytes <= 0)
        {
            CurrentBitsPerSecond = 0;
        }
        else
        {
            _byteCount += bufferSizeInBytes;
            long elapsedMilliseconds = CurrentMilliseconds - _start;

            if (elapsedMilliseconds > 0)
            {
                // 計算當前瞬時比特率
                var bp = _byteCount * 1000L;
                var bps = bp / elapsedMilliseconds;
                var avgBps = bps;

                //如果bps大於最大bps,返回應當延遲的時間。
                if (_maximumBytesPerSecond > 0 && bps > _maximumBytesPerSecond)
                {
                    // 計算延遲時間
                    long wakeElapsed = bp / _maximumBytesPerSecond;
                    var result = (int)(wakeElapsed - elapsedMilliseconds);
                    // 計算平均比特率
                    var div = result / 1000.0;
                    avgBps = (long)(bps / (div == 0 ? 1 : div));

                    if (result > 1)
                    {
                        toSleep = result; ;
                    }
                }
                // 更新當前(平均)比特率
                CurrentBitsPerSecond = (long)(avgBps / 8);
            }
        }

        return toSleep;
    }

    /// <summary>
    /// 將位元組數重置為0,並將開始時間重置為當前時間。
    /// </summary>
    protected void Reset()
    {
        long difference = CurrentMilliseconds - _start;

        // 只有在已知歷史記錄可用時間超過1秒時才重置計數器。
        if (difference > 1000)
        {
            _byteCount = 0;
            _start = CurrentMilliseconds;
        }
    }

    #endregion
}

CaculateThrottleMilliseconds 、Throttle和ThrottleAsync是這個流的核心。CaculateThrottleMilliseconds方法負責計算在寫入或讀取流之前應該延遲多久和更新流當前的傳輸速率,Throttle和ThrottleAsync方法負責同步和非同步延遲。

限流響應正文

using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
using System.IO.Pipelines;
using System;

namespace AccessControlElementary;

// 自定義的HTTP功能介面,提供獲取限流速率設置和當前速率的獲取能力
public interface IHttpResponseThrottlingFeature
{
    public long? MaximumBytesPerSecond { get; }
    public long? CurrentBitsPerSecond { get; }
}

// 限流響應正文的實現類,實現了自定義的功能介面
public class ThrottlingResponseBody : Stream, IHttpResponseBodyFeature, IHttpResponseThrottlingFeature
{
    private readonly IHttpResponseBodyFeature _innerBodyFeature;
    private readonly IOptionsSnapshot<ResponseThrottlingOptions> _options;
    private readonly HttpContext _httpContext;
    private readonly Stream _innerStream;

    private ThrottlingStream? _throttlingStream;
    private PipeWriter? _pipeAdapter;
    private bool _throttlingChecked;
    private bool _complete;
    private int _throttlingRefreshCycleCount;

    public ThrottlingResponseBody(IHttpResponseBodyFeature innerBodyFeature, HttpContext httpContext, IOptionsSnapshot<ResponseThrottlingOptions> options)
    {
        _options = options ?? throw new ArgumentNullException(nameof(options));
        _httpContext = httpContext ?? throw new ArgumentNullException(nameof(httpContext));
        _innerBodyFeature = innerBodyFeature ?? throw new ArgumentNullException(nameof(innerBodyFeature));
        _innerStream = innerBodyFeature.Stream;
        _throttlingRefreshCycleCount = 0;
    }

    public override bool CanRead => false;

    public override bool CanSeek => false;

    public override bool CanWrite => _innerStream.CanWrite;

    public override long Length => _innerStream.Length;

    public override long Position
    {
        get => throw new NotSupportedException();
        set => throw new NotSupportedException();
    }

    public Stream Stream => this;

    public PipeWriter Writer
    {
        get
        {
            if (_pipeAdapter == null)
            {
                _pipeAdapter = PipeWriter.Create(Stream, new StreamPipeWriterOptions(leaveOpen: true));
                if (_complete)
                {
                    _pipeAdapter.Complete();
                }
            }

            return _pipeAdapter;
        }
    }

    public long? MaximumBytesPerSecond => _throttlingStream?.MaximumBytesPerSecond;

    public long? CurrentBitsPerSecond => _throttlingStream?.CurrentBitsPerSecond;

    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();

    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

    public override void SetLength(long value) => throw new NotSupportedException();

    public override void Write(byte[] buffer, int offset, int count)
    {
        OnWriteAsync().ConfigureAwait(false).GetAwaiter().GetResult();

        if (_throttlingStream != null)
        {
            _throttlingStream.Write(buffer, offset, count);
            _throttlingStream.Flush();
        }
        else
        {
            _innerStream.Write(buffer, offset, count);
        }
    }

    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
    {
        await OnWriteAsync();

        if (_throttlingStream != null)
        {
            await _throttlingStream.WriteAsync(buffer, cancellationToken);
            await _throttlingStream.FlushAsync(cancellationToken);
        }
        else
        {
            await _innerStream.WriteAsync(buffer, cancellationToken);
        }
    }

    public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
    {
        var tcs = new TaskCompletionSource(state: state, TaskCreationOptions.RunContinuationsAsynchronously);
        InternalWriteAsync(buffer, offset, count, callback, tcs);
        return tcs.Task;
    }

    private async void InternalWriteAsync(byte[] buffer, int offset, int count, AsyncCallback? callback, TaskCompletionSource tcs)
    {
        try
        {
            await WriteAsync(buffer.AsMemory(offset, count));
            tcs.TrySetResult();
        }
        catch (Exception ex)
        {
            tcs.TrySetException(ex);
        }

        if (callback != null)
        {
            // Offload callbacks to avoid stack dives on sync completions.
            var ignored = Task.Run(() =>
            {
                try
                {
                    callback(tcs.Task);
                }
                catch (Exception)
                {
                    // Suppress exceptions on background threads.
                }
            });
        }
    }

    public override void EndWrite(IAsyncResult asyncResult)
    {
        if (asyncResult == null)
        {
            throw new ArgumentNullException(nameof(asyncResult));
        }

        var task = (Task)asyncResult;
        task.GetAwaiter().GetResult();
    }

    public async Task CompleteAsync()
    {
        if (_complete)
        {
            return;
        }

        await FinishThrottlingAsync(); // Sets _complete
        await _innerBodyFeature.CompleteAsync();
    }

    public void DisableBuffering()
    {
        _innerBodyFeature?.DisableBuffering();
    }

    public override void Flush()
    {
        if (!_throttlingChecked)
        {
            OnWriteAsync().ConfigureAwait(false).GetAwaiter().GetResult();
            // Flush the original stream to send the headers. Flushing the compression stream won't
            // flush the original stream if no data has been written yet.
            _innerStream.Flush();
            return;
        }

        if (_throttlingStream != null)
        {
            _throttlingStream.Flush();
        }
        else
        {
            _innerStream.Flush();
        }
    }

    public override async Task FlushAsync(CancellationToken cancellationToken)
    {
        if (!_throttlingChecked)
        {
            await OnWriteAsync();
            // Flush the original stream to send the headers. Flushing the compression stream won't
            // flush the original stream if no data has been written yet.
            await _innerStream.FlushAsync(cancellationToken);
            return;
        }

        if (_throttlingStream != null)
        {
            await _throttlingStream.FlushAsync(cancellationToken);
            return;
        }

        await _innerStream.FlushAsync(cancellationToken);
    }

    public async Task SendFileAsync(string path, long offset, long? count, CancellationToken cancellationToken)
    {
        await OnWriteAsync();

        if (_throttlingStream != null)
        {
            await SendFileFallback.SendFileAsync(Stream, path, offset, count, cancellationToken);
            return;
        }

        await _innerBodyFeature.SendFileAsync(path, offset, count, cancellationToken);
    }

    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await OnWriteAsync();
        await _innerBodyFeature.StartAsync(cancellationToken);
    }

    internal async Task FinishThrottlingAsync()
    {
        if (_complete)
        {
            return;
        }

        _complete = true;

        if (_pipeAdapter != null)
        {
            await _pipeAdapter.CompleteAsync();
        }

        if (_throttlingStream != null)
        {
            await _throttlingStream.DisposeAsync();
        }
    }

    private async Task OnWriteAsync()
    {
        if (!_throttlingChecked)
        {
            _throttlingChecked = true;
            var maxValue = await _options.Value.ThrottlingProvider.Invoke(_httpContext);
            _throttlingStream = new ThrottlingStream(_innerStream, maxValue < 0 ? 0 : maxValue);
        }

        if (_throttlingStream != null && _options?.Value?.ThrottlingRefreshCycle > 0)
        {
            if (_throttlingRefreshCycleCount >= _options.Value.ThrottlingRefreshCycle)
            {
                _throttlingRefreshCycleCount = 0;

                var maxValue = await _options.Value.ThrottlingProvider.Invoke(_httpContext);
                _throttlingStream.MaximumBytesPerSecond = maxValue < 0 ? 0 : maxValue;
            }
            else
            {
                _throttlingRefreshCycleCount++;
            }
        }
    }
}

自定義的響應正文類必須實現IHttpResponseBodyFeature介面才能作為應用的底層響應流使用,設計和實現參考ASP.NET Core的ResponseCompressionBody。

響應限流中間件

using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
using Timer = System.Timers.Timer;

namespace AccessControlElementary;

public class ResponseThrottlingMiddleware
{
    private readonly RequestDelegate _next;

    public ResponseThrottlingMiddleware(RequestDelegate next)
    {
        _next = next;
    }

    public async Task Invoke(HttpContext context, IOptionsSnapshot<ResponseThrottlingOptions> options, ILogger<ResponseThrottlingMiddleware> logger)
    {
        ThrottlingResponseBody throttlingBody = null;
        IHttpResponseBodyFeature originalBodyFeature = null;

        var shouldThrottling = await options?.Value?.ShouldThrottling?.Invoke(context);
        if (shouldThrottling == true)
        {
            //獲取原始輸出Body
            originalBodyFeature = context.Features.Get<IHttpResponseBodyFeature>();
            //初始化限流Body
            throttlingBody = new ThrottlingResponseBody(originalBodyFeature, context, options);
            //設置成限流Body
            context.Features.Set<IHttpResponseBodyFeature>(throttlingBody);
            context.Features.Set<IHttpResponseThrottlingFeature>(throttlingBody);
            // 用定時器定期向外彙報信息,這可能導致性能下降,僅用於演示目的
            var timer = new Timer(1000);
            timer.AutoReset = true;
            long? currentBitsPerSecond = null;
            var traceIdentifier = context.TraceIdentifier;

            timer.Elapsed += (sender, arg) =>
            {
                if (throttlingBody.CurrentBitsPerSecond != currentBitsPerSecond)
                {
                    currentBitsPerSecond = throttlingBody.CurrentBitsPerSecond;

                    var bps = (double)(throttlingBody.CurrentBitsPerSecond ?? 0);
                    var (unitBps, unit) = bps switch
                    {
                        < 1000 => (bps, "bps"),
                        < 1000_000 => (bps / 1000, "kbps"),
                        _ => (bps / 1000_000, "mbps"),
                    };

                    logger.LogDebug("請求:{RequestTraceIdentifier} 當前響應發送速率:{CurrentBitsPerSecond} {Unit}。", traceIdentifier, unitBps, unit);
                }
            };

            // 開始發送響應後啟動定時器
            context.Response.OnStarting(async () =>
            {
                logger.LogInformation("請求:{RequestTraceIdentifier} 開始發送響應。", traceIdentifier);
                timer.Start();
            });

            // 響應發送完成後銷毀定時器
            context.Response.OnCompleted(async () =>
            {
                logger.LogInformation("請求:{RequestTraceIdentifier} 響應發送完成。", traceIdentifier);
                timer.Stop();
                timer?.Dispose();
            });

            // 請求取消後銷毀定時器
            context.RequestAborted.Register(() =>
            {
                logger.LogInformation("請求:{RequestTraceIdentifier} 已中止。", traceIdentifier);
                timer.Stop();
                timer?.Dispose();
            });
        }

        try
        {
            await _next(context);
            if (shouldThrottling == true)
            {
                // 刷新響應流,確保所有數據都發送到網卡
                await throttlingBody.FinishThrottlingAsync();
            }
        }
        finally
        {
            if (shouldThrottling == true)
            {
                //限流發生錯誤,恢複原始Body
                context.Features.Set(originalBodyFeature);
            }
        }
    }
}

中間件負責把基礎響應流替換為限流響應流,併為每個請求重新讀取選項,使每個請求都能夠獨立控制限流的速率,然後在響應發送啟動後記錄響應的發送速率。

響應限流選項

namespace AccessControlElementary;

public class ResponseThrottlingOptions
{
    /// <summary>
    /// 獲取或設置流量限制的值的刷新周期,刷新時會重新調用<see cref="ThrottlingProvider"/>設置限制值。
    /// 值越大刷新間隔越久,0或負數表示永不刷新。
    /// </summary>
    public int ThrottlingRefreshCycle { get; set; }

    /// <summary>
    /// 獲取或設置指示是否應該啟用流量控制的委托
    /// </summary>
    public Func<HttpContext, Task<bool>> ShouldThrottling { get; set; }

    /// <summary>
    /// 獲取或設置指示流量限制大小的委托(單位:Byte/s)
    /// </summary>
    public Func<HttpContext, Task<int>> ThrottlingProvider { get; set; }
}

響應限流服務註冊和中間件配置擴展

namespace AccessControlElementary;

// 配置中間件用的輔助類和擴展方法
public static class ResponseThrottlingMiddlewareExtensions
{
    public static IApplicationBuilder UseResponseThrottling(this IApplicationBuilder app)
    {
        return app.UseMiddleware<ResponseThrottlingMiddleware>();
    }
}

// 註冊中間件需要的服務的輔助類和擴展方法
public static class ResponseThrottlingServicesExtensions
{
    public static IServiceCollection AddResponseThrottling(this IServiceCollection services, Action<ResponseThrottlingOptions> configureOptions = null)
    {
        services.Configure(configureOptions);
        return services;
    }
}

使用節流組件

服務註冊和請求管道配置

Startup啟動配置

namespace AccessControlElementary;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        // 註冊限流服務和選項
        services.AddResponseThrottling(options =>
        {
            options.ThrottlingRefreshCycle = 100;
            options.ShouldThrottling = static async _ => true;
            options.ThrottlingProvider = static async _ => 100 * 1024; // 100KB/s
        });

        services.AddRazorPages();
    }

    public void Configure(IApplicationBuilder app)
    {
        // 配置響應限流中間件
        app.UseResponseThrottling();

        app.UseStaticFiles();

        app.UseRouting();

        app.UseAuthentication();
        app.UseAuthorization();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapRazorPages();
        });
    }
}

示例展示瞭如何配置和啟用響應限流。ThrottlingRefreshCycle設置為每100次響應流寫入周期刷新一次流量限制的值,使限流值能在響應發送中動態調整;ShouldThrottling設置為無條件啟用限流;ThrottlingProvider設置為限速100 KB/s。
請求只有在UseResponseThrottling之前配置的短路中間件處被處理時不會受影響,請求沒有被短路的話,只要經過限流中間件,基礎響應流就被替換了。如果同時使用了響應壓縮,會變成限流響應包裹壓縮響應(或者相反),壓縮響應(或者限流響應)又包裹基礎響應的嵌套結構。

結語

本書在介紹.NET 6基礎知識時會儘可能使用具有現實意義的示例避免學習和實踐脫節,本文就是其中之一,如果本文對您有價值,歡迎繼續瞭解和購買本書。《C#與.NET6 開發從入門到實踐》預售,作者親自來打廣告了!

本文地址:https://www.cnblogs.com/coredx/p/17195492.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 說起開源CMS,你會想到哪些呢?WordPress?DoraCMS?joomla? 今天再給大家推薦一個非常好用的開源CMS:Wagtail 如果您正在選型的話,可以瞭解一下Wagtail的特點: 基於Django構建,具有出色的文檔管理功能和友好的用戶界面。 提供了一個靈活且易於使用的頁面編輯器, ...
  • 學習網頁設計和網路編程可能是一種有趣而有意義的體驗,但需要時間,精力和練習.這裡有一些技巧可以幫助您更輕鬆地學習這些技能: 從基礎知識開始:在您深入研究高級主題之前,重要的是要有牢固的理解很重要基礎知識.首先學習HTML,CSS和JavaScript,這是網路的基礎語言. 使用線上資源:線上資源有許 ...
  • 概述 鎖是電腦協調多個進程或線程併發訪問某一資源的機制。在資料庫中,除傳統的計算資源(CPU、RAM、I/O)的爭用以外,數據也是一種供許多用戶共用的資源。如何保證數據併發訪問的一致性、有效性是所有資料庫必須解決的一個問題,鎖衝突也是影響資料庫併發訪問性能的一個重要因素。從這個角度來說,鎖對資料庫 ...
  • Java編程語言是由Sun微系統公司在20世紀90年代早期開發的。儘管Java主要用於基於internet的應用程式,但它是一種簡單、高效、通用的語言。Java最初是為運行在多個平臺上的嵌入式網路應用程式而設計的。它是一種可移植的、面向對象的解釋性語言。 Java是非常可移植的。相同的Java應用程 ...
  • VB.NET語言線上運行編譯,是一款可線上編程編輯器,在編輯器上輸入VB.NET語言代碼,點擊運行,可線上編譯運行VB.NET語言,VB.NET語言代碼線上運行調試,VB.NET語言線上編譯,可快速線上測試您的VB.NET語言代碼,線上編譯VB.NET語言代碼發現是否存在錯誤,如果代碼測試通過,將會 ...
  • 控制語句:程式預設是順序執行,但在實際項目中需要選擇、迴圈。 1 選擇控制語句if 1.1 if語句的形式 1 if(條件表達式) 2 {//複合語句,若幹條語句的集合 3 語句一; 4 語句二; 5 } 註意:如果條件成立執行大括弧里的所有語句,不成立的話大括弧里的語句都不執行。 if(條件表達式 ...
  • 導出word,以下為導出單個和zip的兩種格式。 CountDownLatch運用 CountDownLatch和ExecutorService 線程池cachedThreadPool.submit 1、CountDownLatch 概念 CountDownLatch可以使一個獲多個線程等待其他線程 ...
  • 如果熟悉 GIthub 我們經常可以在一些開源項目的 PR 上看到會配置測試的驗證以及覆蓋率的報告,並且可以強制覆蓋率不低於設定的值才可以進行 Merge PR。 1.測試 創建一個 xUnit 單元測試項目。 Class /// <summary> /// Represents a class w ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...