現在,因為種種因素,你必須對一個請求或者方法進行頻率上的訪問限制。 比如, 你對外提供了一個API介面,註冊用戶每秒鐘最多可以調用100次,非註冊用戶每秒鐘最多可以調用10次。 比如, 有一個非常吃伺服器資源的方法,在同一時刻不能超過10個人調用這個方法,否則伺服器滿載。 比如, 有一些特殊的頁面, ...
現在,因為種種因素,你必須對一個請求或者方法進行頻率上的訪問限制。
比如, 你對外提供了一個API介面,註冊用戶每秒鐘最多可以調用100次,非註冊用戶每秒鐘最多可以調用10次。
比如, 有一個非常吃伺服器資源的方法,在同一時刻不能超過10個人調用這個方法,否則伺服器滿載。
比如, 有一些特殊的頁面,訪客並不能頻繁的訪問或發言。
比如, 秒殺活動等進行。
比如 ,防範DDOS,當達到一定頻率後調用腳本iis伺服器ip黑名單,防火牆黑名單。
如上種種的舉例,也就是說,如何從一個切麵的角度對調用的方法進行頻率上的限制。而對頻率限制,伺服器層面都有最直接的解決方法,現在我說的則是代碼層面上的頻率管控。
本文給出兩個示例,一個是基於單機環境的實現,第二個則是基於分散式的Redis實現。
--------------------
以第一個API介面需求為例,先說下單機環境下的實現。
按照慣性思維,我們自然會想到緩存的過期策略這種方法,但是嚴格來講就HttpRuntime.Cache而言,通過緩存的過期策略來對請求進行頻率的併發控制是不合適的。
HttpRuntime.Cache 是應用程式級別的Asp.Net的緩存技術,通過這個技術可以申明多個緩存對象,可以為每個對象設置過期時間,當過期時間到達後該緩存對象就會消失(也就是當你訪問該對象的時候為Null)
為什麼這樣說呢?比如對某個方法(方法名:GetUserList)我們要進行1秒鐘最多10次的限制,現在我們就新建一個int型的Cache對象,然後設置1秒鐘後過期消失。那麼每當訪問GetUserList方法前,我們就先判斷這個Cache對象的值是否大於10,如果大於10就不執行GetUserList方法,如果小於10則允許執行。每當訪問該對象的時候如果不存在或者過期就新建,這樣周而複始,則該對象永遠不可能超過10。
1 if ((int)HttpRuntime.Cache["GetUserListNum"] > 10) //大於10請求失敗 2 { 3 Console.WriteLine("禁止請求"); 4 } 5 else 6 { 7 HttpRuntime.Cache["GetUserListNum"] = (int)HttpRuntime.Cache["GetUserListNum"] + 1; //否則該緩存對象的值+1 8 Console.WriteLine("允許請求"); 9 }
這樣的思想及實現相對來說非常簡單,但是基於這樣的一個模型設定,那麼就會出現這種情況:
如上圖,每個點代表一次訪問請求,我在0秒的時候 新建了一個名字為GetUserListNum的緩存對象。
在0~0.5秒期間 我訪問了3次,在0.5~1秒期間,我們訪問了7次。此時,該對象消失,然後我們接著訪問,該對象重置為0.
在第1~1.5秒期間,還是訪問了7次,在第1.5秒~2秒期間訪問了3次。
基於這種簡單緩存過期策略的模型,在這2秒鐘內,我們雖然平均每秒鐘都訪問了10次,滿足這個規定,但是如果我們從中取一個期間段,0.5秒~1.5秒期間,也是1秒鐘,但是卻實實在在的訪問了14次!遠遠超過了我們設置的 1秒鐘最多訪問10次的 限制。
那麼如何科學的來解決上面的問題呢?我們可以通過模擬會話級別的信號量這一手段,這也就是我們今天的主題了。
什麼是信號量?僅就以代碼而言, static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(5); 它的意思就代表在多線程情況下,在任何一時刻,只能同時5個線程去訪問。
4容器4線程模型
現在,在實現代碼的之前我們先設計一個模型。
假設我們有一個用戶A的管道,這個管道里裝著用戶A的請求,比如用戶A在一秒鐘發出了10次請求,那麼每一個請求過來,管道里的元素都會多一個。但是我們設定這個管道最多只能容納10個元素,而且每個元素的存活期為1秒,1秒後則該元素消失。那麼這樣設計的話,無論是速率還是數量的突進,都會有管道長度的限制。這樣一來,無論從哪一個時間節點或者時間間隔出發,這個管道都能滿足我們的頻率限制需求。
而這裡的管道,就必須和會話Id來對應了。每當有新會話進來的時候就生成一個新管道。這個會話id根據自己場景所定,可以是sessionId,可以是ip,也可以是token。
那麼既然這個管道是會話級別的,我們肯定得需要一個容器,來裝這些管道。現在,我們以IP來命名會話管道,並把所有的管道都裝載在一個容器中,如圖
而基於剛纔的設定,我們還需要對容器內的每條管道的元素進行處理,把過期的給剔除掉,為此,還需要單獨為該容器開闢出一個線程來為每條管道進行元素的清理。而當管道的元素為0時,我們就清掉該管道,以便節省容器空間。
當然,由於用戶量多,一個容器內可能存在上萬個管道,這個時候僅僅用一個容器來裝載來清理,在效率上顯然是不夠的。這個時候,我們就得對容器進行橫向擴展了。
比如,我們可以根據Cpu核心數自動生成對應的數量的容器,然後根據一個演算法,對IP來進行導流。我當前cpu是4個邏輯核心,就生成了4個容器,每當用戶訪問的時候,都會最先經過一個演算法,這個演算法會對IP進行處理,如192.168.1.11~192.168.1.13這個Ip段進第一個容器,xxx~xxx進第二個容器,依次類推,相應的,也就有了4個線程去分別處理4個容器中的管道。
那麼,最終就形成了我們的4容器4線程模型了。
現在,著眼於編碼實現:
首先我們需要一個能承載這些容器的載體,這個載體類似於連接池的概念,可以根據一些需要自動生成適應數量的容器,如果有特殊要求的話,還可以在容器上切出一個容器管理的面,線上程上切出一個線程管理的面以便於實時監控和調度。如果真要做這樣一個系統,那麼 容器的調度 和 線程的調度功能 是必不可少的,而本Demo則是完成了主要功能,像容器和線程在代碼中我也沒剝離開來,演算法也是直接寫死的,實際設計中,對演算法的設計還是很重要的,還有多線程模型中,怎樣上鎖才能讓效率最大化也是重中之重的。
而這裡為了案例的直觀就直接寫死成4個容器。
public static List<Container> ContainerList = new List<Container>(); //容器載體 static Factory() { for (int i = 0; i < 4; i++) { ContainerList.Add(new Container(i)); //遍歷4次 生成4個容器 } foreach (var item in ContainerList) { item.Run(); //開啟線程 } }
現在,我們假定 有編號為 0 到 40 這樣的 41個用戶。那麼這個導流演算法 我也就直接寫死,編號0至9的用戶 將他們的請求給拋轉到第一個容器,編號10~19的用戶 放到第二個容器,編號20~29放到第三個容器,編號30~40的用戶放到第四個容器。
那麼這個代碼就是這樣的:
static Container GetContainer(int userId, out int i) //獲取容器的演算法 { if (0 <= userId && userId < 10) //編號0至9的用戶 返回第一個容器 依次類推 { i = 0; return ContainerList[0]; } if (10 <= userId && userId < 20) { i = 1; return ContainerList[1]; } if (20 <= userId && userId < 30) { i = 2; return ContainerList[2]; } i = 3; return ContainerList[3]; }
當我們的會話請求經過演算法的導流之後,都必須調用一個方法,用於辨別管道數量。如果管道數量已經大於10,則請求失敗,否則成功
public static void Add(int userId) { if (GetContainer(userId, out int i).Add(userId)) Console.WriteLine("容器" + i + " 用戶" + userId + " 發起請求"); else Console.WriteLine("容器" + i + " 用戶" + userId + " 被攔截"); }
接下來就是容器Container的代碼了。
這裡,對容器的選型用線程安全的ConcurrentDictionary類。
線程安全:當多個線程同時讀寫同一個共用元素的時候,就會出現數據錯亂,迭代報錯等安全問提
ConcurrentDictionary:除了GetOrAdd方法要慎用外,是.Net4.0專為解決Dictionary線程安全而出的新類型
ReaderWriterLockSlim:較ReaderWriterLock優化的讀寫鎖,多個線程同時訪問讀鎖 或 一個線程訪問寫鎖
private ReaderWriterLockSlim obj = new ReaderWriterLockSlim(); //在每個容器中申明一個讀寫鎖 public ConcurrentDictionary<string, ConcurrentList<DateTime>> dic = new ConcurrentDictionary<string, ConcurrentList<DateTime>>(); //創建該容器 dic
然後當你向容器添加一條管道中的數據是通過這個方法:
public bool Add(int userId) { obj.EnterReadLock();//掛讀鎖,允許多個線程同時寫入該方法 try { ConcurrentList<DateTime> dtList = dic.GetOrAdd(userId.ToString(), new ConcurrentList<DateTime>()); //如果不存在就新建 ConcurrentList return dtList.CounterAdd(10, DateTime.Now); //管道容量10,當臨界管道容量後 返回false } finally { obj.ExitReadLock(); } }
這裡,為了在後面的線程遍歷刪除ConcurrentList的管道的時候保證ConcurrentList的安全性,所以此處要加讀鎖。
而ConcurrentList,因為.Net沒有推出List集合類的線程安全,所以自己新建了一個繼承於List<T>的安全類型,在這裡 封裝了3個需要使用的方法。
public class ConcurrentList<T> : List<T> { private object obj = new object(); public bool CounterAdd(int num, T value) { lock (obj) { if (base.Count >= num) return false; else base.Add(value); return true; } } public new bool Remove(T value) { lock (obj) { base.Remove(value); return true; } } public new T[] ToArray() { lock (obj) { return base.ToArray(); } } }
最後就是線程的運行方法:
public void Run() { ThreadPool.QueueUserWorkItem(c => { while (true) { if (dic.Count > 0) { foreach (var item in dic.ToArray()) { ConcurrentList<DateTime> list = item.Value; foreach (DateTime dt in list.ToArray()) { if (DateTime.Now.AddSeconds(-3) > dt) { list.Remove(dt); Console.WriteLine("容器" + seat + " 已刪除用戶" + item.Key + "管道中的一條數據"); } } if (list.Count == 0) { obj.EnterWriteLock(); try { if (list.Count == 0) { if (dic.TryRemove(item.Key, out ConcurrentList<DateTime> i)) { Console.WriteLine("容器" + seat + " 已清除用戶" + item.Key + "的List管道"); } } } finally { obj.ExitWriteLock(); } } } } else { Thread.Sleep(100); } } } ); }
最後,是效果圖,一個是基於控制台的,還一個是基於Signalr的。
分散式下Redis
上面介紹了一種頻率限制的模型,分散式與單機相比,無非就是載體不同,我們只要把這個容器的載體從程式上移植出來,來弄成一個單獨的服務或者直接借用Redis也是可行的。
這裡就介紹分散式情況下,Redis的實現。
不同於Asp.Net的多線程模型,大概因為Redis的各種類型的元素非常粒度的操作導致各種加鎖的複雜性,所以在網路請求處理這塊Redis是單線程的,基於Redis的實現則因為單線程的緣故在編碼角度不用太多考慮到與邏輯無關的問題。
簡單介紹下,Redis是一個記憶體資料庫,這個資料庫屬於非關係型資料庫,它的概念不同於一般的我們認知的Mysql Oracle SqlServer關係型資料庫,它沒有Sql沒有欄位名沒有表名這些概念,它和HttpRunTime.Cache的概念差不多一樣,首先從操作上屬於鍵值對模式,就如 Cache["鍵名"] 這樣就能獲取到值類似,而且可以對每個Key設置過期策略,而Redis中的Key所對應的值並不是想存啥就存啥的,它支持五種數據類型:string(字元串),hash(哈希),list(列表),set(集合)及sorted set(有序集合)。
今天要說的是Sorted set有序集合,有序集合相比其它的集合類型的特殊點在於,使用有序集合的時候還能給插入的元素指定一個 積分score,我們把這個積分score理解為排序列,它內部會對積分進行排序,積分允許重覆,而有序集合中的元素則是唯一。
還是同樣的思路,每當有用戶訪問的時候,都對該用戶的 管道(有序集合)中添加一個元素,然後設置該元素的積分為當前時間。接著在程式中開個線程,來對管道中積分小於約定時間的元素進行清理。因為規定有序集合中的元素只能是唯一值,所以在賦值方面只要是滿足uuid即可。
那麼用Redis來實現的代碼那就是類似這種:
通過using語法糖實現IDisposable而包裝的Redis分散式鎖,然後裡面正常的邏輯判斷。
這樣的代碼雖然也能完成功能,但不夠友好。Redis是個基於記憶體的資料庫,於性能而言,瓶頸在於網路 IO 上,與Get一次發出一次請求相比,能不能通過一段腳本來實現大部分邏輯呢?
有的,Redis支持 Lua腳本:
Lua 是一種輕量小巧的腳本語言,用標準C語言編寫並以源代碼形式開放, 其設計目的是為了嵌入應用程式中,從而為應用程式提供靈活的擴展和定製功能。
大致意思就是,直接向Redis發送一段腳本或者讓它直接本地讀取一段腳本從而直接實現所有的邏輯。
/// <summary> /// 如果 大於10(AccountNum) 就返回1 否則就增加一條集合中的元素 並返回 空 /// </summary> /// <param name="zcardKey"></param> /// <param name="score"></param> /// <param name="zcardValue"></param> /// <param name="AccountNum"></param> /// <returns></returns> public string LuaAddAccoundSorted(string zcardKey, double score, string zcardValue, int AccountNum) { string str = "local uu = redis.call('zcard',@zcardKey) if (uu >=tonumber(@AccountNum)) then return 1 else redis.call('zadd',@zcardKey,@score,@zcardValue) end"; var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str), new { zcardKey = zcardKey, score = score, zcardValue = zcardValue, AccountNum=AccountNum }); return re.ToString(); }
local uu就是申明一個為名uu的變數的意思,redis.call就是redis命令,這段腳本意思就是如果 大於10(AccountNum) 就返回1 否則就增加一條集合中的元素 並返回 空。
管道內元素處理的方法就是:
/// <summary> /// 遍歷當前所有首碼的有序集合,如果數量為0,那麼就返回1 否則 就刪除 滿足最大分值條件區間的元素,如果該集合個數為0則消失 /// </summary> /// <param name="zcardPrefix"></param> /// <param name="score"></param> /// <returns></returns> public string LuaForeachRemove(string zcardPrefix, double score) { StringBuilder str = new StringBuilder(); str.Append("local uu = redis.call('keys',@zcardPrefix) "); //聲明一個變數 去獲取 模糊查詢的結果集合 str.Append("if(#uu==0) then"); //如果集合長度=0 str.Append(" return 1 "); str.Append("else "); str.Append(" for i=1,#uu do "); //遍歷 str.Append(" redis.call('ZREMRANGEBYSCORE',uu[i],0,@score) "); //刪除從0 到 該score 積分區間的元素 str.Append(" if(redis.call('zcard',uu[i])==0) then "); //如果管道長度=0 str.Append(" redis.call('del',uu[i]) "); //刪除 str.Append(" end "); str.Append(" end "); str.Append("end "); var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str.ToString()), new { zcardPrefix = zcardPrefix + "*", score = score }); return re.ToString();
這2段代碼通過發送Lua腳本的形式來完成了整個過程,因為Redis的網路模型原因,所以把LuaForeachRemove方法給提出來做個服務來單獨處理即可。至於那種多容器多線程的實現,則完全可以開多個Redis的實例來實現。最後放上效果圖。
最後,我把這些都給做成了個Demo。但是沒有找到合適的上傳網盤,所以大家可以留郵箱(留了就發),或者直接加QQ群文件自取,討論交流:166843154
我喜歡和我一樣的人交朋友,不被環境影響,自己是自己的老師,歡迎加群 .Net web交流群, QQ群:166843154 欲望與掙扎
作者:小曾
出處:http://www.cnblogs.com/1996V/p/8127576.html 歡迎轉載,但任何轉載必須保留完整文章及博客園出處,在顯要地方顯示署名以及原文鏈接。
.Net交流群, QQ群:166843154 欲望與掙扎