導讀 前幾天和一個朋友討論了他們公司的系統問題,傳統的單體應用,集群部署,他說近期服務的併發量可能會出現瞬時增加的風險,雖然部署了集群,但是通過壓測後發現請求延遲仍然是很大,想問問我有什麼改進的地方。我沉思了一會,現在去改架構顯然是不可能的,於是我給出了一個建議,讓他去做個介面限流,這樣能夠保證瞬時 ...
導讀
- 前幾天和一個朋友討論了他們公司的系統問題,傳統的單體應用,集群部署,他說近期服務的併發量可能會出現瞬時增加的風險,雖然部署了集群,但是通過壓測後發現請求延遲仍然是很大,想問問我有什麼改進的地方。我沉思了一會,現在去改架構顯然是不可能的,於是我給出了一個建議,讓他去做個介面限流,這樣能夠保證瞬時併發量飆高也不會出現請求延遲的問題,用戶的體驗度也會上去。
- 至於什麼是介面限流?怎麼實現介面限流?如何實現單機應用的限流?如何實現分散式應用的限流?本篇文章將會詳細闡述。
限流的常見幾種演算法
- 常見的限流演算法有很多,但是最常用的演算法無非以下四種。
固定視窗計數器
- 固定演算法的概念如下
- 將時間劃分為多個視窗
- 在每個視窗內每有一次請求就將計數器加一
- 如果計數器超過了限制數量,則本視窗內所有的請求都被丟棄當時間到達下一個視窗時,計數器重置。
- 固定視窗計數器是最為簡單的演算法,但這個演算法有時會讓通過請求量允許為限制的兩倍。考慮如下情況:限制 1 秒內最多通過 5 個請求,在第一個視窗的最後半秒內通過了 5 個請求,第二個視窗的前半秒內又通過了 5 個請求。這樣看來就是在 1 秒內通過了 10 個請求。
滑動視窗計數器
- 滑動視窗計數器演算法概念如下:
- 將時間劃分為多個區間;
- 在每個區間內每有一次請求就將計數器加一維持一個時間視窗,占據多個區間;
- 每經過一個區間的時間,則拋棄最老的一個區間,並納入最新的一個區間;
- 如果當前視窗內區間的請求計數總和超過了限制數量,則本視窗內所有的請求都被丟棄。
- 滑動視窗計數器是通過將視窗再細分,並且按照時間 " 滑動 ",這種演算法避免了固定視窗計數器帶來的雙倍突發請求,但時間區間的精度越高,演算法所需的空間容量就越大。
漏桶演算法
- 漏桶演算法概念如下:
- 將每個請求視作 " 水滴 " 放入 " 漏桶 " 進行存儲;
- “漏桶 " 以固定速率向外 " 漏 " 出請求來執行如果 " 漏桶 " 空了則停止 " 漏水”;
- 如果 " 漏桶 " 滿了則多餘的 " 水滴 " 會被直接丟棄。
- 漏桶演算法多使用隊列實現,服務的請求會存到隊列中,服務的提供方則按照固定的速率從隊列中取出請求並執行,過多的請求則放在隊列中排隊或直接拒絕。
- 漏桶演算法的缺陷也很明顯,當短時間內有大量的突發請求時,即便此時伺服器沒有任何負載,每個請求也都得在隊列中等待一段時間才能被響應。
令牌桶演算法
- 令牌桶演算法概念如下:
- 令牌以固定速率生成。
- 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多餘的令牌會直接丟棄,當請求到達時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執行。
- 如果桶空了,那麼嘗試取令牌的請求會被直接丟棄。
- 令牌桶演算法既能夠將所有的請求平均分佈到時間區間內,又能接受伺服器能夠承受範圍內的突發請求,因此是目前使用較為廣泛的一種限流演算法。
單體應用實現
- 在傳統的單體應用中限流只需要考慮到多線程即可,使用Google開源工具類guava即可。其中有一個RateLimiter專門實現了單體應用的限流,使用的是令牌桶演算法。
- 單體應用的限流不是本文的重點,官網上現成的API,讀者自己去看看即可,這裡不再詳細解釋。
分散式限流
- 分散式限流和熔斷現在有很多的現成的工具,比如Hystrix,Sentinel 等,但是還是有些企業不引用外來類庫,因此就需要自己實現。
- Redis作為單線程多路復用的特性,很顯然能夠勝任這項任務。
Redis如何實現
- 使用令牌桶的演算法實現,根據前面的介紹,我們瞭解到令牌桶演算法的基礎需要兩個個變數,分別是桶容量,產生令牌的速率。
- 這裡我們實現的就是每秒產生的速率加上一個桶容量。但是如何實現呢?這裡有幾個問題。
需要保存什麼數據在redis中?
- 當前桶的容量,最新的請求時間
- 以什麼數據結構存儲?
- 因為是針對介面限流,每個介面的業務邏輯不同,對併發的處理也是不同,因此要細化到每個介面的限流,此時我們選用HashMap的結構,hashKey是介面的唯一id,可以是請求的uri,裡面的分別存儲當前桶的容量和最新的請求時間。
- 如何計算需要放令牌?
- 根據redis保存的上次的請求時間和當前時間比較,如果相差大於的產生令牌的時間(陳某實現的是1秒)則再次產生令牌,此時的桶容量為當前令牌+產生的令牌
- 如何保證redis的原子性?
- 保證redis的原子性,使用lua腳本即可解決。
有了上述的幾個問題,便能很容易的實現。
開擼
1、lua腳本如下:
local ratelimit_info = redis.pcall('HMGET',KEYS[1],'last_time','current_token')
local last_time = ratelimit_info[1]
local current_token = tonumber(ratelimit_info[2])
local max_token = tonumber(ARGV[1])
local token_rate = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
if current_token == nil then
current_token = max_token
last_time = current_time
else
local past_time = current_time-last_time
if past_time>1000 then
current_token = current_token+token_rate
last_time = current_time
end
## 防止溢出
if current_token>max_token then
current_token = max_token
last_time = current_time
end
end
local result = 0
if(current_token>0) then
result = 1
current_token = current_token-1
last_time = current_time
end
redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)
return result
- 調用lua腳本出四個參數,分別是介面方法唯一id,桶容量,每秒產生令牌的數量,當前請求的時間戳。
2、 SpringBoot代碼實現
- 採用Spring-data-redis實現lua腳本的執行。
- Redis序列化配置:
/**
* 重新註入模板
*/
@Bean(value = "redisTemplate")
@Primary
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
//設置序列化方式,key設置string 方式,value設置成json
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
jsonRedisSerializer.setObjectMapper(objectMapper);
template.setEnableDefaultSerializer(false);
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jsonRedisSerializer);
template.setHashValueSerializer(jsonRedisSerializer);
return template;
}
- 限流工具類
/**
* @Description 限流工具類
* @Author CJB
* @Date 2020/3/19 17:21
*/
public class RedisLimiterUtils {
private static StringRedisTemplate stringRedisTemplate=ApplicationContextUtils.applicationContext.getBean(StringRedisTemplate.class);
/**
* lua腳本,限流
*/
private final static String TEXT="local ratelimit_info = redis.pcall('HMGET',KEYS[1],'last_time','current_token')\n" +
"local last_time = ratelimit_info[1]\n" +
"local current_token = tonumber(ratelimit_info[2])\n" +
"local max_token = tonumber(ARGV[1])\n" +
"local token_rate = tonumber(ARGV[2])\n" +
"local current_time = tonumber(ARGV[3])\n" +
"if current_token == nil then\n" +
" current_token = max_token\n" +
" last_time = current_time\n" +
"else\n" +
" local past_time = current_time-last_time\n" +
" \n" +
" if past_time>1000 then\n" +
"\t current_token = current_token+token_rate\n" +
"\t last_time = current_time\n" +
" end\n" +
"\n" +
" if current_token>max_token then\n" +
" current_token = max_token\n" +
"\tlast_time = current_time\n" +
" end\n" +
"end\n" +
"\n" +
"local result = 0\n" +
"if(current_token>0) then\n" +
" result = 1\n" +
" current_token = current_token-1\n" +
" last_time = current_time\n" +
"end\n" +
"redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)\n" +
"return result";
/**
* 獲取令牌
* @param key 請求id
* @param max 最大能同時承受多少的併發(桶容量)
* @param rate 每秒生成多少的令牌
* @return 獲取令牌返回true,沒有獲取返回false
*/
public static boolean tryAcquire(String key, int max,int rate) {
List<String> keyList = new ArrayList<>(1);
keyList.add(key);
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setResultType(Long.class);
script.setScriptText(TEXT);
return Long.valueOf(1).equals(stringRedisTemplate.execute(script,keyList,Integer.toString(max), Integer.toString(rate),
Long.toString(System.currentTimeMillis())));
}
}
- 採用攔截器+註解的方式實現,註解如下:
/**
* @Description 限流的註解,標註在類上或者方法上。在方法上的註解會覆蓋類上的註解,同@Transactional
* @Author CJB
* @Date 2020/3/20 13:36
*/
@Inherited
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
/**
* 令牌桶的容量,預設100
* @return
*/
int capacity() default 100;
/**
* 每秒鐘預設產生令牌數量,預設10個
* @return
*/
int rate() default 10;
}
- 攔截器如下:
/**
* @Description 限流的攔器
* @Author CJB
* @Date 2020/3/19 14:34
*/
@Component
public class RateLimiterIntercept implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod){
HandlerMethod handlerMethod=(HandlerMethod)handler;
Method method = handlerMethod.getMethod();
/**
* 首先獲取方法上的註解
*/
RateLimit rateLimit = AnnotationUtils.findAnnotation(method, RateLimit.class);
//方法上沒有標註該註解,嘗試獲取類上的註解
if (Objects.isNull(rateLimit)){
//獲取類上的註解
rateLimit = AnnotationUtils.findAnnotation(handlerMethod.getBean().getClass(), RateLimit.class);
}
//沒有標註註解,放行
if (Objects.isNull(rateLimit))
return true;
//嘗試獲取令牌,如果沒有令牌了
if (!RedisLimiterUtils.tryAcquire(request.getRequestURI(),rateLimit.capacity(),rateLimit.rate())){
//拋出請求超時的異常
throw new TimeOutException();
}
}
return true;
}
}
SpringBoot配置攔截器的代碼就不貼了,以上就是完整的代碼,至此分散式限流就完成了。
如果覺得作者寫的好,有所收穫的話,點個關註推薦一下喲!!!