> 註:本文隸屬於《理解ASP.NET Core》系列文章,請查看置頂博客或[點擊此處查看全文目錄](https://www.cnblogs.com/xiaoxiaotank/p/15185288.html) # 概述 在微服務化的架構設計中,網關扮演著重要的看門人角色,它所提供的功能之一就是**限 ...
註:本文隸屬於《理解ASP.NET Core》系列文章,請查看置頂博客或點擊此處查看全文目錄
概述
在微服務化的架構設計中,網關扮演著重要的看門人角色,它所提供的功能之一就是限流。而對於眾多非微服務化的系統來說,可能並不會部署網關(無論是因為成本還是複雜度),在這種場景下,為了實現限流,微軟在 .NET 7 中提供了官方的限流中間件。下麵我們一起來看一下。
註冊限流策略
首先,確保你的應用依賴的 SDK 版本 >= 7,接著通過AddRateLimiter
擴展方法註冊限流服務,並添加限流策略,然後通過UseRateLimiter
啟用限流中間件,最後配置某個路由的請求使用限流策略:
builder.Services.AddRateLimiter(limiterOptions =>
{
// 配置限流策略
});
app.UseRateLimiter();
app.MapGet("LimitTest", async () =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
return Results.Ok($"Limiter");
}).RequireRateLimiting("my_policy");
微軟為我們提供了 4 種常用的限流演算法:
- FixedWindowLimiter:固定視窗限流器
- SlidingWindowLimiter:滑動視窗限流器
- TokenBucketLimiter:令牌桶限流器
- ConcurrencyLimiter:併發限流器
我們通常會註冊一個命名限流策略,併在該策略內指定限流演算法,以及其他限流邏輯。
另外,需要關註一下UseRateLimiter
的調用位置。若限流行為作用於特定路由,則限流中間件必須放置在UseRouting
之後。
FixedWindowLimiter
固定視窗限流器是一種簡單的限流方式:
- 工作原理:使用固定的時間長度來限制請求數量。假設固定視窗長度為10s,則每10s就會切換(銷毀並創建)一個新的視窗,在每個單獨的視窗內,限制請求流量。
- 特點:
- 優點:實現簡單,占用記憶體低
- 缺點:
- 當視窗中請求流量到達閾值時,流量會被瞬間切斷,不能平滑地處理突發流量(實際應用中理想效果是讓流量平滑地進入系統中)
- 視窗切換時可能會出現 2 倍請求流量。比如視窗大小為 1s,閾值為100,視窗 1 在後 500ms 內處理了 100 個請求,視窗 2 在前 500ms 內也處理了 100 個請求,這樣就導致在 1s 內處理了 200 個請求
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddFixedWindowLimiter(policyName: "fixed", fixedOptions =>
{
fixedOptions.PermitLimit = 4;
fixedOptions.Window = TimeSpan.FromSeconds(60);
fixedOptions.QueueLimit = 2;
fixedOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
fixedOptions.AutoReplenishment = true;
});
});
public sealed class FixedWindowRateLimiterOptions
{
public TimeSpan Window { get; set; } = TimeSpan.Zero;
public bool AutoReplenishment { get; set; } = true;
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我們通過AddFixedWindowLimiter
添加了一個固定視窗限流策略,並指定策略名為fixed
。它的含義是視窗時間長度為60s,在每個視窗時間範圍內,最多允許4個請求被處理。
各配置項含義如下:
- PermitLimit:視窗閾值,即每個視窗時間範圍內,最多允許的請求個數。這裡指定為最多允許4個請求。該值必須 > 0
- Window:視窗大小,即時間長度。這裡設置為 60 s。該值必須 >
TimeSpan.Zero
- QueueLimit:
- 當視窗請求數達到最大時,後續請求會進入排隊,用於設置隊列的大小(即允許幾個請求在裡面排隊等待)
- 這裡設置為隊列中最多允許 2 個請求排隊,也就是說,在這個視窗內,可以最多有6個請求,4個會被處理,2個則在排隊,其他的則會在一定時間後拒絕返回
RejectionStatusCode
- 該值必須 >= 0
- QueueProcessingOrder:排隊請求的處理順序。這裡設置為優先處理先來的請求
- AutoReplenishment:指示開啟新視窗時是否自動重置請求限制,該值預設為
true
。如果設置為false
,則需要手動調用FixedWindowRateLimiter.TryReplenish
來重置
SlidingWindowLimiter
滑動視窗限流器是固定視窗限流器的升級版:
- 工作原理:
- 在固定視窗限流器的基礎上,它再將每個視窗劃分為多個段,每經過一個段的時間間隔(= 視窗時間 / 視窗段的個數),視窗就會向後滑動一段,所以稱為滑動視窗(視窗大小仍是固定的)。
- 當視窗滑動後,會“吃進”一個段(稱為當前段),並“吐出”一個段(稱為過期段),過期段會被回收,回收的請求數可以用於當前段。
- 特點:
- 優點:按段滑動處理,相對於固定視窗來說,可以對流量進行更精準的控制,更平滑的處理突發流量,並且段劃分的越多,移動更平滑。
- 缺點:對時間精度要求高,比固定視窗實現複雜,記憶體占用更高
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddSlidingWindowLimiter(policyName: "sliding", slidingOptions =>
{
slidingOptions.PermitLimit = 100;
slidingOptions.Window = TimeSpan.FromSeconds(30);
slidingOptions.QueueLimit = 2;
slidingOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
slidingOptions.AutoReplenishment = true;
slidingOptions.SegmentsPerWindow = 3;
});
});
public sealed class SlidingWindowRateLimiterOptions
{
public TimeSpan Window { get; set; } = TimeSpan.Zero;
public int SegmentsPerWindow { get; set; }
public bool AutoReplenishment { get; set; } = true;
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我們通過AddSlidingWindowLimiter
添加了一個滑動視窗限流策略,並指定策略名為sliding
。它的含義是視窗時間長度為30s,在每個視窗時間範圍內,最多允許100個請求,視窗段數為 3,每個段的時間間隔為 30s / 3 = 10s,即視窗每 10s 滑動一段。
各配置項含義如下:
- PermitLimit:視窗閾值,即每個視窗時間範圍內,最多允許的請求個數。這裡指定為最多允許100個請求。該值必須 > 0
- Window:視窗大小,即時間長度。這裡設置為 30 s。該值必須 >
TimeSpan.Zero
- QueueLimit:
- 當視窗請求數達到最大時,後續請求會進入排隊,用於設置隊列的大小(即允許幾個請求在裡面排隊等待)
- 這裡設置為隊列中最多允許 2 個請求排隊
- 該值必須 >= 0
- QueueProcessingOrder:排隊請求的處理順序。這裡設置為優先處理先來的請求
- AutoReplenishment:指示開啟新視窗時是否自動重置請求限制,該值預設為
true
。如果設置為false
,則需要手動調用SlidingWindowRateLimiter.TryReplenish
來重置 - SegmentsPerWindow:每個視窗的段的個數,通過它可以計算出每個段滑動的時間間隔。這裡設置段數為 3,時間間隔為 10s。該值必須 > 0
為了更好地理解滑動視窗限流器的工作原理,下麵我會借用官方文檔提供的一張圖來詳細解釋一下:
假設:限制每個視窗的請求數為 100,視窗時間為 30s,每個視窗的段數為 3,那麼每個段的時間間隔就是 30s / 3 = 10s。
定義:當前段結存請求數 = 當前段可用請求數 - 處理請求數 + 回收請求數
限流器工作流程:
- 在第 1 個段時(0s~10s),當前段可用請求數為 100,處理了 20 個請求,回收請求 0 個,那麼結存請求數 = 100 - 20 + 0 = 80
- 在第 2 個段時(10s~20s),當前段可用請求數為 80,處理了 30 個請求,回收請求 0 個,那麼結存請求數 = 80 - 30 + 0 = 50
- 在第 3 個段時(20s~30s),當前段可用請求數為 50,處理了 40 個請求,回收請求 0 個,那麼結存請求數 = 50 - 40 + 0 = 10
- 在第 4 個段時(30s~40s),當前段可用請求數為 10,處理了 30 個請求,回收第 1 個段的請求 20 個,那麼結存請求數 = 10 - 30 + 20 = 0
- 在第 5 個段時(40s~50s),當前段可用請求數為 0,處理了 10 個請求,回收第 2 個段的請求 30 個,那麼結存請求數 = 0 - 10 + 30 = 20
- 在第 6 個段時(50s~60s),當前段可用請求數為 20,處理了 10 個請求,回收第 3 個段的請求 40 個,那麼結存請求數 = 20 - 10 + 40 = 50
TokenBucketLimiter
令牌桶限流器是一種限制數據平均傳輸速率的限流演算法:
- 工作原理:想象有一個桶,每個固定時間段會向桶內放入固定數量的令牌(token),當桶內令牌裝滿時,新的令牌將會被丟棄。當請求流量進入時,會先從桶內拿 1 個令牌,拿到了則該請求會被處理,沒拿到則會在隊列中等待,若隊列已滿,則會被限流拒絕處理。
- 特點:可以限制數據的平均傳輸速率,還可以一次性耗盡令牌應對突發流量,並平滑地處理後續流量,是一種通用的演算法
以下圖為例,桶內有 3 個令牌(token),進來了 5 個請求,前三個請求可以拿到令牌(token),它們會被處理,後面兩個就只能排隊或被限流拒絕。
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddTokenBucketLimiter(policyName: "token_bucket", tokenBucketOptions =>
{
tokenBucketOptions.TokenLimit = 4;
tokenBucketOptions.ReplenishmentPeriod = TimeSpan.FromSeconds(10);
tokenBucketOptions.TokensPerPeriod = 2;
tokenBucketOptions.QueueLimit = 2;
tokenBucketOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
tokenBucketOptions.AutoReplenishment = true;
});
});
public sealed class TokenBucketRateLimiterOptions
{
public TimeSpan ReplenishmentPeriod { get; set; } = TimeSpan.Zero;
public int TokensPerPeriod { get; set; }
public bool AutoReplenishment { get; set; } = true;
public int TokenLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我們通過AddTokenBucketLimiter
添加了一個令牌桶限流策略,並指定策略名為token_bucket
。它的含義是桶最多可以裝 4 個令牌,每 10s 發放一次令牌,每次發放 2 個令牌,所以在一個發放周期內,最多可以處理 4 個請求,至少可以處理 2 個請求
各配置項含義如下:
- TokenLimit:桶最多可以裝的令牌數,發放的多餘令牌會被丟棄。這裡設置為最多裝 4 個令牌。該值必須 > 0
- ReplenishmentPeriod:令牌發放周期,即多長時間發放一次令牌。這裡設置為 10 s。該值必須 >
TimeSpan.Zero
- TokensPerPeriod:每個周期發放的令牌數,即每個周期向桶內放入的令牌數(若超過桶可裝令牌數的最大值,則會被丟棄)。這裡設置為 2 個。該值必須 > 0
- QueueLimit:
- 當桶內的令牌全部被拿完(token 數為 0)時,後續請求會進入排隊,用於設置隊列的大小(即允許幾個請求在裡面排隊等待)
- 這裡設置為隊列中最多允許 2 個請求排隊
- 該值必須 >= 0
- QueueProcessingOrder:排隊請求的處理順序。這裡設置為優先處理先來的請求
- AutoReplenishment:指示當進入新的令牌發放周期時,是否自動發放令牌,該值預設為
true
。如果設置為false
,則需要手動調用TokenBucketRateLimiter.TryReplenish
來發放
ConcurrencyLimiter
併發限流器不是限制一段時間內的最大請求數,而是限制併發數:
- 工作原理:限制同一時刻併發請求的數量
- 特點:可以充分利用伺服器性能,當出現突發流量時,伺服器負載可能會持續過高。
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddConcurrencyLimiter(policyName: "concurrency", concurrencyOptions =>
{
concurrencyOptions.PermitLimit = 4;
concurrencyOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
concurrencyOptions.QueueLimit = 2;
});
});
public sealed class ConcurrencyLimiterOptions
{
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我們通過AddConcurrencyLimiter
添加了一個併發限流策略,並指定策略名為concurrency
。它的含義是最多可以併發4個請求被處理。
各配置項含義如下:
- PermitLimit:最多併發的請求數。該值必須 > 0
- QueueLimit:
- 當併發請求數達到最大時,後續請求會進入排隊,用於設置隊列的大小(即允許幾個請求在裡面排隊等待)
- 這裡設置為隊列中最多允許 2 個請求排隊
- 該值必須 >= 0
- QueueProcessingOrder:排隊請求的處理順序。這裡設置為優先處理先來的請求
RateLimiterOptions
上面已經把常用的限流演算法介紹完了,下麵來看一下可以通過limiterOptions
進行哪些配置:
public sealed class RateLimiterOptions
{
// 僅保留了常用的配置項,其他相關代碼均忽略
// 全局限流器
public PartitionedRateLimiter<HttpContext>? GlobalLimiter { get; set; }
// 當請求被限流拒絕時執行
public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; set; }
// 當期你去被限流拒絕時的 Http 響應狀態碼
public int RejectionStatusCode { get; set; } = StatusCodes.Status503ServiceUnavailable;
}
GlobalLimiter
通過GlobalLimiter
,我們可以設置全局限流器,更準確的說法是全局分區限流器,該限流器會應用於所有請求。執行順序為先執行全局限流器,再執行特定於路由終結點的限流器(如果存在的話)。
需要註意的是,相對於上面註冊的限流策略來說,GlobalLimiter
已經是一個限流器實例了,所以需要分配給他一個分區限流器實例,通過PartitionedRateLimiter.Create
來創建。
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, IPAddress>(context =>
{
IPAddress? remoteIpAddress = context.Connection.RemoteIpAddress;
// 針對非迴環地址限流
if (!IPAddress.IsLoopback(remoteIpAddress!))
{
return RateLimitPartition.GetTokenBucketLimiter
(remoteIpAddress!, _ =>
new TokenBucketRateLimiterOptions
{
TokenLimit = 4,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 2,
ReplenishmentPeriod = TimeSpan.FromSeconds(10),
TokensPerPeriod = 10,
AutoReplenishment = true
});
}
// 若為迴環地址,則不限流
return RateLimitPartition.GetNoLimiter(IPAddress.Loopback);
});
});
鏈式組合的限流器
它並不是一個新類型的限流器,而是可以將我們上面提到的分區限流器進行組合而得到一個新的分區限流器。
例如我可以將包含固定視窗限流邏輯的分區限流器和將包含併發限流邏輯的分區限流器組合進行組合,那麼應用該限流器的請求就會先被固定視窗限流器處理,再被併發限流器處理,任意一個被限流,就會被拒絕。
var chainedLimiter = PartitionedRateLimiter.CreateChained(
PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
{
var userAgent = httpContext.Request.Headers.UserAgent.ToString();
return RateLimitPartition.GetFixedWindowLimiter
(userAgent, _ =>
new FixedWindowRateLimiterOptions
{
AutoReplenishment = true,
PermitLimit = 4,
Window = TimeSpan.FromSeconds(2)
});
}),
PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
{
var userAgent = httpContext.Request.Headers.UserAgent.ToString();
return RateLimitPartition.GetConcurrencyLimiter
(userAgent, _ =>
new ConcurrencyLimiterOptions
{
PermitLimit = 4,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 2
});
})
);
目前鏈式組合的限流器只能用於全局限流器,而不能用於終結點限流器。
RejectionStatusCode
通過RejectionStatusCode
,我們可以設置請求被限流拒絕後,http預設的響應狀態碼。預設為 503 服務不可用,我們可以指定為 429 過多的請求。
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.RejectionStatusCode = StatusCodes.Status429TooManyRequests;
});
另外,該狀態碼可以在OnRejected
中被重寫,具體參見下小節。
OnRejected
當請求被限流時,會觸發回調OnRejected
,通過該委托我們可以針對 http 響應進行自定義配置:
- RetryAfter:設置響應頭
Retry-After
,指示多長時間後重試請求。需要註意的是,併發限流器無法獲取到 RetryAfter,因為它不是時間段的限流,而是限制的併發數
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.OnRejected = (context, cancellationToken) =>
{
if (context.Lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter))
{
context.HttpContext.Response.Headers.RetryAfter =
((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo);
}
// 可以重新設置響應狀態碼,會覆蓋掉上面設置的 limiterOptions.RejectionStatusCod
context.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
context.HttpContext.RequestServices.GetService<ILoggerFactory>()?
.CreateLogger("Microsoft.AspNetCore.RateLimitingMiddleware")
.LogWarning("OnRejected: {GetUserEndPoint}", GetUserEndPoint(context.HttpContext));
return ValueTask.CompletedTask;
};
});
自定義限流策略
上述提到的限流策略,並不能滿足我們所有的需求,所以瞭解如何自定義限流策略是我們的必修課。
在開始編碼之前,你需要瞭解以下內容:
- 上述使用
AddXXXLimiter
添加的限流策略,內部實際上調用了AddPolicy
(後面的部分會詳細介紹) - 上述使用
AddXXXLimiter
添加的限流策略,每種策略只有一個分區,即使用了該限流策略的路由共用一個分區。例如通過AddFixedWindowLimiter
添加了限流策略“fixed”,視窗閾值為 10,並有 10 個路由使用了該策略,那麼在一個視窗內,這 10 個路由總的請求數達到 10,那這 10 個路由後續的請求都會被限流。
下麵我們就藉助AddPolicy
,分別使用兩種方式添加一個自定義策略“my_policy”:一個用戶一個分區,匿名用戶共用一個分區
通過委托創建自定義限流策略
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddPolicy(policyName: "my_policy", httpcontext =>
{
var userId = "anonymous user";
if (httpcontext.User.Identity?.IsAuthenticated is true)
{
userId = httpcontext.User.Claims.First(c => c.Type == "id").Value;
}
return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new
FixedWindowRateLimiterOptions
{
PermitLimit = 3,
Window = TimeSpan.FromSeconds(60),
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 0
});
});
});
通過實現 IRateLimiterPolicy 創建自定義限流策略
public interface IRateLimiterPolicy<TPartitionKey>
{
// 若不為空,則執行它(不會執行全局的),如果它為空,則執行全局的
Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }
// 獲取限流分區
RateLimitPartition<TPartitionKey> GetPartition(HttpContext httpContext);
}
public class MyRateLimiterPolicy : IRateLimiterPolicy<string>
{
// 可以通過依賴註入參數
public MyRateLimiterPolicy(ILogger<MyRateLimiterPolicy> logger)
{
// 可以設置自己的限流拒絕回調邏輯,而不使用上面全局設置的 limiterOptions.OnRejected
OnRejected = (ctx, token) =>
{
ctx.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
logger.LogWarning($"Request rejected by {nameof(MyRateLimiterPolicy)}");
return ValueTask.CompletedTask;
};
}
public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }
public RateLimitPartition<string> GetPartition(HttpContext httpContext)
{
var userId = "anonymous user";
if (httpContext.User.Identity?.IsAuthenticated is true)
{
userId = httpContext.User.Claims.First(c => c.Type == "id").Value;
}
return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new
FixedWindowRateLimiterOptions
{
PermitLimit = 3,
Window = TimeSpan.FromSeconds(60),
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 0
});
}
}
// 記得註冊它
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddPolicy<string, MyRateLimiterPolicy>(policyName: "my_policy");
}
應用限流策略
RequireRateLimiting & DisableRateLimiting
可以一次性為所有 controller 應用限流策略
app.MapControllers().RequireRateLimiting("fixed");
也可以為指定路由應用限流策略
app.MapGet("LimitTest", () =>{ }).RequireRateLimiting("fixed");
實質上,RequireRateLimiting
和DisableRateLimiting
是通過向終結點元數據中EnableRateLimiting
和DisableRateLimiting
兩個特性來實現的。
public static class RateLimiterEndpointConventionBuilderExtensions
{
public static TBuilder RequireRateLimiting<TBuilder>(this TBuilder builder, string policyName) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(policyName)));
return builder;
}
public static TBuilder RequireRateLimiting<TBuilder, TPartitionKey>(this TBuilder builder, IRateLimiterPolicy<TPartitionKey> policy) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder =>
{
endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(new
DefaultRateLimiterPolicy(
RateLimiterOptions.ConvertPartitioner<TPartitionKey>(null, policy.GetPartition), policy.OnRejected)));
});
return builder;
}
public static TBuilder DisableRateLimiting<TBuilder>(this TBuilder builder) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(DisableRateLimitingAttribute.Instance));
return builder;
}
}
EnableRateLimitingAttribute & DisableRateLimitingAttribute
在Controller
層面,我們可以方便的使用特性來標註使用或禁用限流策略。這兩個特性可以標註在Controller
類上,也可以標註在類的方法上。
但需要註意的時,如果前面使用了RequireRateLimiting
或DisableRateLimiting
擴展方法,由於它們在元數據中添加特性比直接使用特性標註要晚,所以它們的優先順序很高,會覆蓋掉這裡使用的策略。建議不要針對所有 Controller 使用RequireRateLimiting
或DisableRateLimiting
。
下麵是一個應用示例:
[EnableRateLimiting("fixed")] // 針對整個 Controller 使用限流策略 fixed
public class WeatherForecastController : ControllerBase
{
// 會使用 Controller 類上標註的 fixed 限流策略
[HttpGet(Name = "GetWeatherForecast")]
public string Get() => "Get";
[HttpGet("Hello")]
[EnableRateLimiting("my_policy")] // 會使用 my_policy 限流策略,而不會使用 fixed
public string Hello() => "Hello";
[HttpGet("disable")]
[DisableRateLimiting] // 禁用任何限流策略
public string Disable() => "Disable";
}
設計原理
為了方便理解接下來的內容,先明確幾個容易混淆的類型的概念:
- RateLimitPartition
:限流分區, TKey
表示分區的 Key,被同一限流分區作用的請求會互相影響,不同限流分區則不影響。 - RateLimitPartition:非泛型的,它只是個靜態類,用來快速創建限流分區
RateLimitPartition<TKey>
- PartitionedRateLimiter
:分區限流器,即包含了限流分區的限流器,內部會使用各個限流分區對不同請求進行限流。 TResource
表示被限流的資源類型,比如 Http 請求類型為HttpContext
。限流中間件就是通過它來進行限流操作的。 - PartitionedRateLimiter:非泛型的,同樣只是個靜態類,用來快速創建分區限流器
PartitionedRateLimiter<TResource>
篇幅所限,下方示例列出的源碼會忽略一部分非核心代碼。
AddRateLimiter
AddRateLimiter
很簡單,只是單純的進行選項配置:
public static class RateLimiterServiceCollectionExtensions
{
public static IServiceCollection AddRateLimiter(this IServiceCollection services, Action<RateLimiterOptions> configureOptions)
{
services.Configure(configureOptions);
return services;
}
}
AddXXXLimiter
以下僅以AddFixedWindowLimiter
為例進行講解,其他三個都是類似的。
public static class RateLimiterOptionsExtensions
{
public static RateLimiterOptions AddFixedWindowLimiter(this RateLimiterOptions options, string policyName, Action<FixedWindowRateLimiterOptions> configureOptions)
{
var key = new PolicyNameKey() { PolicyName = policyName };
var fixedWindowRateLimiterOptions = new FixedWindowRateLimiterOptions();
configureOptions.Invoke(fixedWindowRateLimiterOptions);
fixedWindowRateLimiterOptions.AutoReplenishment = false;
return options.AddPolicy(policyName, context =>
{
return RateLimitPartition.GetFixedWindowLimiter(key,
_ => fixedWindowRateLimiterOptions);
});
}
}
首先是配置選項,可以看到它把AutoReplenishment
強制設置為了false
,不對啊,如果這樣設置豈不是要我來手動調用TryReplenish
來重置次數了。其實不然,我們一會看GetFixedWindowLimiter
的實現就知道原因了。
接著就是調用AddPolicy
,傳入策略名和一個委托來添加策略,該委托會返回一個限流分區,分區內可以通過工廠獲取限流器實例。可以看到該策略的分區 key 是固定不變的,即該策略共用一個限流分區。
public static class RateLimitPartition
{
public static RateLimitPartition<TKey> GetFixedWindowLimiter<TKey>(
TKey partitionKey,
Func<TKey, FixedWindowRateLimiterOptions> factory)
{
return Get(partitionKey, key =>
{
FixedWindowRateLimiterOptions options = factory(key);
if (options.AutoReplenishment is true)
{
options = new FixedWindowRateLimiterOptions
{
PermitLimit = options.PermitLimit,
QueueProcessingOrder = options.QueueProcessingOrder,
QueueLimit = options.QueueLimit,
Window = options.Window,
AutoReplenishment = false
};
}
return new FixedWindowRateLimiter(options);
});
}
public static RateLimitPartition<TKey> Get<TKey>(
TKey partitionKey,
Func<TKey, RateLimiter> factory)
=> new RateLimitPartition<TKey>(partitionKey, factory);
可以看到,如果AutoReplenishment
為true
,會重新new
一個新選項,這個新的選項僅僅是將AutoReplenishment
設置為false
。為什麼呢?這是因為如果它為true
,那麼每一個FixedWindowRateLimiters
實例(即限流分區)都會有一個自己的定時器來定時補充許可,這無疑是很浪費的。所以將它設置為false
,由分區限流器中的的定時器來統一管理其下的所有分區,降低資源消耗,不用擔心,微軟已經幫我們實現好了(具體在RateLimitingMiddleware
小節中會介紹),不需要自己實現。
策略被保存到RateLimiterOptions
的PolicyMap
和UnactivatedPolicyMap
中,其中:
PolicyMap
是指已經創建了創建了策略實例的限流策略集UnactivatedPolicyMap
是指還未創建策略實例的限流策略集,它保存的不是策略實例,而是創建策略的委托。這種一般是實現了IRateLimiterPolicy<TPartitionKey>
介面的策略,我們需要在運行時向它的構造函數註入一些參數。
它們倆都是用於提供限流策略,只不過前者已經構造好了實例,可以直接拿來用,後者則需要在運行時創建實例,然後才能用。
我們的固定視窗限流器策略顯然是存放到PolicyMap
中,:
public sealed class RateLimiterOptions
{
internal Dictionary<string, DefaultRateLimiterPolicy> PolicyMap { get; }
= new Dictionary<string, DefaultRateLimiterPolicy>(StringComparer.Ordinal);
internal Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>> UnactivatedPolicyMap { get; }
= new Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>>(StringComparer.Ordinal);
public RateLimiterOptions AddPolicy<TPartitionKey>(string policyName, Func<HttpContext, RateLimitPartition<TPartitionKey>> partitioner)
{
// 策略名不能重覆
if (PolicyMap.ContainsKey(policyName) || UnactivatedPolicyMap.ContainsKey(policyName))
{
throw ...;
}
PolicyMap.Add(policyName, new DefaultRateLimiterPolicy(ConvertPartitioner<TPartitionKey>(policyName, partitioner), null));
return this;
}
}
可以看到,承載策略的實例類型均為DefaultRateLimiterPolicy
,即使你是註冊的IRateLimiterPolicy<TPartitionKey>
類型的策略,最終也是會轉化為DefaultRateLimiterPolicy
。
RateLimiter
現在限流器實例的獲取方式已經知道了,那接下來詳細看一下FixedWindowRateLimiter
的詳細設計吧。
首先,所有限流器均繼承自抽象類RateLimiter
:
public abstract class RateLimiter : IAsyncDisposable, IDisposable
{
public abstract RateLimiterStatistics? GetStatistics();
public abstract TimeSpan? IdleDuration { get; }
public RateLimitLease AttemptAcquire(int permitCount = 1)
=> AttemptAcquireCore(permitCount);
protected abstract RateLimitLease AttemptAcquireCore(int permitCount);
public ValueTask<RateLimitLease> AcquireAsync(int permitCount = 1, CancellationToken cancellationToken = default)
=> AcquireAsyncCore(permitCount, cancellationToken);
protected abstract ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken);
}
其中,我們需要重點關註以下成員:
- IdleDuration:空閑周期,即該限流器有多長時間始終保持著最大可用許可數了。
- 例如,一個限流器在時間點A,重新發放了許可,沒有一個請求來獲取許可,那麼它的空閑周期就是
當前時間 - A
,當有請求獲取許可時,空閑周期就會被置為null
- 限流管理器會通過它來清理未使用的限流器
- 例如,一個限流器在時間點A,重新發放了許可,沒有一個請求來獲取許可,那麼它的空閑周期就是
- GetStatistics:獲取統計數據,主要包含當前有多少可用的許可、當前排隊的請求個數、許可出租成功的總次數以及許可出租失敗的總次數。
- AttemptAcquire:嘗試獲取許可,當請求獲取到許可時,則會被處理,否則會被限流拒絕。
- 它接收一個
permitCount
參數,表示想要獲取的許可數量,預設值為 1。它所允許的值範圍是 >= 0,當傳入 0 時,表示查看是否還能獲取到許可(不會消耗許可數)。 - 返回值類型
RateLimitLease
擁有一個bool IsAcquired
屬性,表示許可是否獲取成功
- 它接收一個
- AcquireAsync:非同步獲取許可,它會一直等待,直到成功獲取到許可,或者無法獲取足夠的許可(比如排隊隊列裝不下),才會返回結果。
- 它接收一個
permitCount
參數,表示想要獲取的許可數量,預設值為 1。它所允許的值範圍是 >= 0,當傳入 0 時,它會一直等待,直到可以獲取到許可,或者再也不能獲取到許可了(不會消耗許可數)。 - 同樣的,返回值類型
RateLimitLease
擁有一個bool IsAcquired
屬性,表示許可是否獲取成功
- 它接收一個
接著,對於FixedWindowLimiter
、SlidingWindowLimiter
和TokenBucketLimiter
來說,它們都是時間範圍的限流演算法,都具備Replenish
性質,所以又抽象出一層ReplenishingRateLimiter
:
public abstract class ReplenishingRateLimiter : RateLimiter
{
// 許可發放周期
public abstract TimeSpan ReplenishmentPeriod { get; }
// 是否自動補充許可
public abstract bool IsAutoReplenishing { get; }
// 嘗試補充許可
// 當 AutoReplenishment == true 時,不會執行補充許可的邏輯,因為它是自動的,不允許手動干預
public abstract bool TryReplenish();
}
ConcurrencyLimiter
直接繼承自RateLimiter
最後具體看一下FixedWindowRateLimiter
的詳細實現,先來看構造函數以及一些常用屬性:
public sealed class FixedWindowRateLimiter : ReplenishingRateLimiter
{
// 用於重新補充許可的定時器
private readonly Timer? _renewTimer;
// 選項,會 clone 一份構造函數傳進來的 options
private readonly FixedWindowRateLimiterOptions _options;
// 指示許可租賃成功的結果
private static readonly RateLimitLease SuccessfulLease = new FixedWindowLease(true, null);
// 指示許可租賃失敗的結果
private static readonly RateLimitLease FailedLease = new FixedWindowLease(false, null);
// 空閑周期
public override TimeSpan? IdleDuration => ...;
// 是否自動補充許可
public override bool IsAutoReplenishing => _options.AutoReplenishment;
// 許可發放周期,對於固定視窗來說,就是視窗大小
public override TimeSpan ReplenishmentPeriod => _options.Window;
public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options)
{
// 省略部分代碼...
// 如果 AutoReplenishment == true,則會創建定時器,用於定時補充許可
// 不過我們從前面可以得知,傳遞到這裡的是 false,所以定時器並不會被創建
if (_options.AutoReplenishment)
{
_renewTimer = new Timer(Replenish, this, _options.Window, _options.Window);
}
}
}
接下來是補充許可TryReplenish
的實現:
public override bool TryReplenish()
{
// 當 AutoReplenishment == true 時,不會執行補充許可的邏輯,因為它是自動的,不允許手動干預
if (_options.AutoReplenishment)
{
return false;
}
Replenish(this);
return true;
}
private static void Replenish(object? state)
{
FixedWindowRateLimiter limiter = (state as FixedWindowRateLimiter)!;
// 獲取當前時間
long nowTicks = Stopwatch.GetTimestamp();
limiter!.ReplenishInternal(nowTicks);
}
private void ReplenishInternal(long nowTicks)
{
// 如果當前時間距離上次許可發放時間還沒達到視窗大小,則直接返回
if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks && !_options.AutoReplenishment)
{
return;
}
int availablePermitCounters = _permitCount;
if (availablePermitCounters >= _options.PermitLimit)
{
// 如果當前可用許可數 >= 限流器配置的最大許可數,則無須重新發放,直接返回
return;
}
// 補充許可
_permitCount = _options.PermitLimit;
// 先處理排隊的請求
while (_queue.Count > 0)
{
// 根據 QueueProcessingOrder 從隊列中找到(Peek)最老或最新的請求
RequestRegistration nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.PeekHead()
: _queue.PeekTail();
// 若請求已完成處理,則只需要將它移出隊列(Dequeue),並釋放資源即可。
// 請求已完成可能的原因如下:
// 1. 已被取消
// 2. 當 QueueProcessingOrder 設置為 NewestFirst 時,新來的請求把老的踢出了隊列
if (nextPendingRequest.Tcs.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
// 若可用的許可數足夠,則從隊列中取出請求並處理
else if (_permitCount >= nextPendingRequest.Count)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
// 扣減
_queueCount -= nextPendingRequest.Count;
_permitCount -= nextPendingRequest.Count;
// 向請求補充許可
// 若發放失敗,這還原扣減
if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
{
_permitCount += nextPendingRequest.Count;
_queueCount += nextPendingRequest.Count;
}
// 釋放資源
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else
{
// 請求無法被處理,直接跳出
break;
}
}
if (_permitCount == _options.PermitLimit)
{
// 當可用許可數等於配置的最大許可數,則開始計算空閑周期
_idleSince = Stopwatch.GetTimestamp();
}
}
下麵一起看一下許可是如何租出去的。由於非同步的AcquireAsyncCore
基本包含了同步的AttemptAcquireCore
的處理邏輯,所以下麵就只看AcquireAsyncCore
。需要著重說一下的是,同步的AttemptAcquireCore
是不會進行入隊操作的。
源碼裡面其實有很多鎖,為了便於理解我都刪除了。
protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
// 當申請的許可數 == 0,並且可用許可數 > 0 時,則直接返回 SuccessfulLease,表示限流器還有可用許可
// 對於同步的 AttemptAcquireCore 方法來說,若此時可用許可數為 0,則會直接返回 FailedLease,表示限流器沒有可用許可
if (permitCount == 0 && _permitCount > 0)
{
return new ValueTask<RateLimitLease>(SuccessfulLease);
}
// 嘗試租賃
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
{
return new ValueTask<RateLimitLease>(lease);
}
// 如果隊列裝不下要申請許可的所有請求
if (_options.QueueLimit - _queueCount < permitCount)
{
// 如果優先處理新來的,並且要申請許可的請求數沒有超過隊列的大小限制,
// 則將隊列中老的請求踢出隊列,直到為新來的請求留出足夠的空間,準備將新來的請求加進去
if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
{
do
{
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
// 設置老請求申請許可失敗
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
{
_queueCount += oldestRequest.Count;
}
oldestRequest.CancellationTokenRegistration.Dispose();
}
while (_options.QueueLimit - _queueCount < permitCount);
}
else
{
// 如果優先處理後來的,則只能返回 失敗
return new ValueTask<RateLimitLease>(CreateFailedWindowLease(permitCount));
}
}
// 這部分代碼不用太在意
CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(static obj =>
{
((CancelQueueState)obj!).TrySetCanceled();
}, tcs);
}
RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr);
// 將新請求加入到隊尾
_queue.EnqueueTail(registration);
_queueCount += permitCount;
// 非同步可等待,直到 Task 執行完成獲取到結果(可能是申請成功,也可能是失敗)
return new ValueTask<RateLimitLease>(registration.Tcs.Task);
}
TryLeaseUnsynchronized
具體邏輯如下:
private bool TryLeaseUnsynchronized(int permitCount, out RateLimitLease? lease)
{
// 若可用的許可數足夠,且不為 0
if (_permitCount >= permitCount && _permitCount != 0)
{
// 租賃的許可為0,則直接返回 成功
if (permitCount == 0)
{
lease = SuccessfulLease;
return true;
}
// 若:
// 1. 沒有排隊的請求
// 2. 或有排隊的請求,但是 QueueProcessingOrder 被設置為 NewestFirst
// 則租賃成功,其他則租賃失敗(因為要先把排隊的處理完)
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
{
// 許可租賃出去了,也就表示該限流器不空閑了
_idleSince = null;
_permitCount -= permitCount;
lease = SuccessfulLease;
return true;
}
}
// 租賃失敗
lease = null;
return false;
}
RateLimitingMiddleware
現在,我們已經掌握了限流器補充許可和租賃許可的細節邏輯了,並且也得知並沒有使用限流器內部的定時器去定時補充許可,那這是由誰補充的呢?又是由誰為請求申請的許可呢?
沒錯,這都是RateLimitingMiddleware
負責的。
在構造方法中,我們需要重點關註下CreateEndpointLimiter
,它創建了終結點分區限流器,與全局限流器一起提供限流服務。
internal sealed partial class RateLimitingMiddleware
{
// 預設被限流拒絕回調的委托,取自 options.OnRejected
private readonly Func<OnRejectedContext, CancellationToken, ValueTask>? _defaultOnRejected;
// 全局限流器,取自 options.GlobalLimiter
private readonly PartitionedRateLimiter<HttpContext>? _globalLimiter;
// 終結點限流器
private readonly PartitionedRateLimiter<HttpContext> _endpointLimiter;
// 限流響應狀態碼,取自 options.RejectionStatusCode
private readonly int _rejectionStatusCode;
// 限流策略集,取自 options.PolicyMap 和 options.UnactivatedPolicyMap
private readonly Dictionary<string, DefaultRateLimiterPolicy> _policyMap;
public RateLimitingMiddleware(RequestDelegate next, ILogger<RateLimitingMiddleware> logger, IOptions<RateLimiterOptions> options, IServiceProvider serviceProvider, RateLimitingMetrics metrics)
{
// ...省略一堆代碼
_endpointLimiter = CreateEndpointLimiter();
}
}
在CreateEndpointLimiter
方法中,創建了分區限流器,裡面包含了各種各樣的限流分區,用於不同終結點請求的限流。
private PartitionedRateLimiter<HttpContext> CreateEndpointLimiter()
{
// 創建分區限流器
return PartitionedRateLimiter.Create<HttpContext, DefaultKeyType>(context =>
{
DefaultRateLimiterPolicy? policy;
var enableRateLimitingAttribute = context.GetEndpoint()?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
// 如果不需要限流,則返回 NoLimiter
if (enableRateLimitingAttribute is null)
{
return RateLimitPartition.GetNoLimiter<DefaultKeyType>(_defaultPolicyKey);
}
// 根據限流策略取限流分區
policy = enableRateLimitingAttribute.Policy;
if (policy is not null)
{
return policy.GetPartition(context);
}
var name = enableRateLimitingAttribute.PolicyName;
if (name is not null)
{
if (_policyMap.TryGetValue(name, out policy))
{
return policy.GetPartition(context);
}
else
{
throw new InvalidOperationException($"This endpoint requires a rate limiting policy with name {name}, but no such policy exists.");
}
}
// 雖然策略名或策略不可能為空,但是加一下判斷更好
else
{
throw new InvalidOperationException("This endpoint requested a rate limiting policy with a null name.");
}
}, new DefaultKeyTypeEqualityComparer());
}
咦?怎麼還是沒看到在哪自動補充的許可?實際上它就隱藏在PartitionedRateLimiter.Create
中的DefaultPartitionedRateLimiter
裡面,藏得太深了:
public static class PartitionedRateLimiter
{
public static PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(
Func<TResource, RateLimitPartition<TPartitionKey>> partitioner,
IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull
{
return new DefaultPartitionedRateLimiter<TResource, TPartitionKey>(partitioner, equalityComparer);
}
}
下麵是DefaultPartitionedRateLimiter
啟動定時器執行心跳的核心代碼:
internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : PartitionedRateLimiter<TResource> where TKey : notnull
{
// 限流器集合
private readonly Dictionary<TKey, Lazy<RateLimiter>> _limiters;
// 限流分區委托,可通過資源獲取到分區
private readonly Func<TResource, RateLimitPartition<TKey>> _partitioner;
// 定時器,主要作用是每 100ms 進行一次心跳,即執行 Heartbeat 方法
private readonly TimerAwaitable _timer;
private readonly Task _timerTask;
public DefaultPartitionedRateLimiter(Func<TResource, RateLimitPartition<TKey>> partitioner,
IEqualityComparer<TKey>? equalityComparer = null)
{
_limiters = new Dictionary<TKey, Lazy<RateLimiter>>(equalityComparer);
_partitioner = partitioner;
var timerInterval = TimeSpan.FromMilliseconds(100);
_timer = new TimerAwaitable(timerInterval, timerInterval);
_timerTask = RunTimer();
}
private async Task RunTimer()
{
_timer.Start();
// 只要 timer 不被停止,則一直返回 true,即 timer 仍在運行中
while (await _timer)
{
try
{
await Heartbeat().ConfigureAwait(false);
}
catch { }
}
_timer.Dispose();
}
}
TimerAwaitable
是一個可非同步等待的類型(實現了GetAwaiter
、INotifyCompletion
、IsCompleted
和GetResult
),內部設計非常有意思。在它內部,啟動了一個定時器,每 100ms(傳入的timerInterval) Tick 一次,每次 Tick 就會把 IsCompleted
設置為true
,將任務狀態切換為已完成。外部通過await
獲取結果時(靜默調用GetResult
),又會將IsCompleted
設置為false
,再將其轉換為未完成狀態。外部再配合while
以達到定時執行的效果。
為什麼不直接用Timer
而又弄出一個TimerAwaitable
?我認為TimerAwaitable
有以下優點:
- 優雅的書寫非同步代碼
- 它的定時執行不會出現重入。即不會因為上一次定時任務執行耗時超過定時間隔還未完成,這一次又執行了定時任務,導致同時有兩個甚至多個線程在執行定時任務。
通過定時器,每 100ms 執行一次心跳,心跳過程中檢查各個限流器是否需要補充許可,如果需要,則補充,並回收空閑限流器等。以下是簡化的心跳邏輯:
private async Task Heartbeat()
{
if (_cacheInvalid)
{
_cachedLimiters.Clear();
_cachedLimiters.AddRange(_limiters);
}
// 遍歷所有緩存的限流器
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> rateLimiter in _cachedLimiters)
{
// 如果限流器還未被實例化,則跳過
if (!rateLimiter.Value.IsValueCreated) continue;
// 如果限流器空閑周期超過了空閑時間限制(預設10s),則回
if (rateLimiter.Value.Value.IdleDuration is TimeSpan idleDuration && idleDuration > s_idleTimeLimit)
{
lock (Lock)
{
// 雙重檢測,確保限流器確實是空閑的
idleDuration = rateLimiter.Value.Value.IdleDuration ?? TimeSpan.Zero;
if (idleDuration > s_idleTimeLimit)
{
_cacheInvalid = true;
// 回收該限流器
_limiters.Remove(rateLimiter.Key);
// 保存下來,後面一起釋放資源
_limitersToDispose.Add(rateLimiter.Value.Value);
}
}
}
// 如果限流器可補充許可,則嘗試補充
else if (rateLimiter.Value.Value is ReplenishingRateLimiter replenishingRateLimiter)
{
try
{
replenishingRateLimiter.TryReplenish();
}
catch (Exception ex) { ... }
}
}
// 釋放回收的限流器資源
foreach (RateLimiter limiter in _limitersToDispose)
{
try
{
await limiter.DisposeAsync().ConfigureAwait(false);
}
catch (Exception ex) { ... }
}
_limitersToDispose.Clear();
}
好了,我們已經瞭解了限流器的管理,讓我們再次回到RateLimitingMiddleware
,看看他是如何工作的吧:
public Task Invoke(HttpContext context)
{
var endpoint = context.GetEndpoint();
// 如果終結點包含禁用限流標記,則不限流
if (endpoint?.Metadata.GetMetadata<DisableRateLimitingAttribute>() is not null)
{
return _next(context);
}
var enableRateLimitingAttribute = endpoint?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
// 如果終結點沒有啟用限流標記,並且全局限流器也是空的,則同樣不限流
if (enableRateLimitingAttribute is null && _globalLimiter is null)
{
return _next(context);
}
return InvokeInternal(context, enableRateLimitingAttribute);
}
private async Task InvokeInternal(HttpContext context, EnableRateLimitingAttribute? enableRateLimitingAttribute)
{
var policyName = enableRateLimitingAttribute?.PolicyName;
// 嘗試獲取許可
using var leaseContext = await TryAcquireAsync(context);
// 如果獲取到了許可,則處理請求
if (leaseContext.Lease?.IsAcquired == true)
{
await _next(context);
}
// 沒有獲取到許可,則限流拒絕
else
{
// 如果請求是被取消的,則不要執行 OnRejected 回調,應該直接返回
if (leaseContext.RequestRejectionReason == RequestRejectionReason.RequestCanceled)
{
return;
}
var thisRequestOnRejected = _defaultOnRejected;
context.Response.StatusCode = _rejectionStatusCode;
// 如果請求是被終結點限流器限流拒絕的
if (leaseContext.RequestRejectionReason == RequestRejectionReason.EndpointLimiter)
{
// 若策略有自己的 OnRejected,則使用策略的,如果沒有,則使用 _defaultOnRejected
// 這裡我感覺是個 bug,應該判斷 policy?.OnRejected is not null 才賦值
DefaultRateLimiterPolicy policy = enableRateLimitingAttribute?.Policy;
if (policy is not null)
{
thisRequestOnRejected = policy.OnRejected;
}
else
{
// 對於策略名,當 OnRejected 不為空時,才使用策略的 OnRejected
if (policyName is not null && _policyMap.TryGetValue(policyName, out policy) && policy.OnRejected is not null)
{
thisRequestOnRejected = policy.OnRejected;
}
}
}
// 執行回調
if (thisRequestOnRejected is not null)
{
await thisRequestOnRejected(new OnRejectedContext() { HttpContext = context, Lease = leaseContext.Lease! }, context.RequestAborted);
}
}
}
TryAcquireAsync
會先從全局限流器獲取許可,如果獲取到了,則會繼續在終結點限流器中獲取許可,如果獲取到了,請求才會被處理:
非同步的
CombinedWaitAsync
與同步的CombinedAcquire
類似,只不過前面調用的是非同步方法,後面是同步,故下方僅列出CombinedAcquire
簡化源碼。
private async ValueTask<LeaseContext> TryAcquireAsync(HttpContext context, MetricsContext metricsContext)
{
// 組合獲取,即按順序從全局限流器和終結點限流器中獲取許可
var leaseContext = CombinedAcquire(context);
// 如果獲取到了,則直接返回
if (leaseContext.Lease?.IsAcquired == true)
{
return leaseContext;
}
// 非同步等待再次獲取許可
return await CombinedWaitAsync(context, context.RequestAborted);
}
private LeaseContext CombinedAcquire(HttpContext context)
{
// 全局限流器不為空,則先從其中獲取許可
if (_globalLimiter is not null)
{
var globalLease = _globalLimiter.AttemptAcquire(context);
// 未獲取許可,直接返回
if (!globalLease.IsAcquired)
{
return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.GlobalLimiter, Lease = globalLease };
}
}
// 從終結點限流器中獲取許可
var endpointLease = _endpointLimiter.AttemptAcquire(context);
// 未獲取許可,直接返回
if (!endpointLease.IsAcquired)
{
globalLease?.Dispose();
return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.EndpointLimiter, Lease = endpointLease };
}
return globalLease is null
? new LeaseContext() { Lease = endpointLease }
: new LeaseContext() { Lease = new DefaultCombinedLease(globalLease, endpointLease) };
}
總結
- ASP.NET Core 為我們提供了限流功能,通過
AddRateLimiter
註冊限流服務,通過UseRateLimiter
啟用限流功能。 - 預設提供了4種限流演算法,分別是:
- FixedWindowLimiter:固定視窗限流器
- SlidingWindowLimiter:滑動視窗限流器
- TokenBucketLimiter:令牌桶限流器
- ConcurrencyLimiter:併發限流器
- 可以通過
options.AddPolicy
添加限流策略,作用於某些終結點,這些策略最終組成的分區限流器稱為終結點限流器- 可以通過
options.AddXXXLimiter
的方式快捷添加限流策略 - 也可以自定義限流策略,限流邏輯可以通過委托直接傳入,也可以通過實現介面
IRateLimiterPolicy<TPartitionKey>
- 可以通過
- 可以通過
options.GlobalLimiter
設置全局限流器,當請求進入應用時,會先執行全局限流器,再執行終結點限流器。 - 可以通過
options.RejectionStatusCode
設置限流拒絕的響應狀態碼,還可以通過OnRejected
編寫更多的響應邏輯。 - 可以通過
PartitionedRateLimiter.CreateChained
將多個分區限流器進行鏈式組合 - 目前 ASP.NET Core 提供的限流功能還不夠成熟,例如終結點限流器無法進行鏈式組合、無法為終結點設置多個限流策略等