物聯網海量設備心跳註冊,脫網清除——多線程高併發互斥鎖落地 [toc] 1.應用背景 在物聯網應用場景中,需要維護很多個設備的連接,比如基於TCP socket通信的長連接,目的是為了獲取設備採集的信息,反向控制設備的數字開關或者模擬量。我們把這些TCP長連接都放入了基於線程安全的Concurren ...
物聯網海量設備心跳註冊,脫網清除——多線程高併發互斥鎖落地
目錄
1.應用背景
在物聯網應用場景中,需要維護很多個設備的連接,比如基於TCP socket通信的長連接,目的是為了獲取設備採集的信息,反向控制設備的數字開關或者模擬量。我們把這些TCP長連接都放入了基於線程安全的ConcurrentDictionary激活字典表中,IP地址作為key,設備箱領域模型作為value。我們需要把激活設備箱的字典表維護好,需要將超時沒有心跳的設備,我們可以稱之為脫網設備,給清理出激活字典表,寫入到脫網告警字典表中去。當脫網設備下次再有心跳時,可以再次移入到激活字典表中,從而再產生恢復告警,進行一系列其他動作。
2.整體框架
2.1.心跳註冊框架
2.1.1.海量設備
因為要模擬海量設備的TCP場景,我們利用模擬器生成了12000台模擬設備。8台真實設備。
2.1.2.心跳上報Handler流程
詳細心跳上報流程詳見上述框架圖
- 第一次建立TCP長連接,並且上報心跳報文;
- socket緩存會先處理TCP中存在的粘包,具體方法可參見此篇博文 TCP粘包處理現象及其解決方案——基於NewLife.Net網路庫的管道式幀長粘包處理方法
- 然後會觸發OnReceive中的e事件,從而傳入粘包處理後的message;
- 判斷包有效性,因為這方面比較簡單,根據不同協議寫一個類來處理即可,這裡不再展開;
- 包有效載荷的CRC判斷,具體實現可參見此篇博文 基於Modbus三種CRC16校驗方法的性能對比;
- 包類型解析(這裡特指解析出心跳包);
- 心跳包解析,具體可參見這兩篇博文。深入淺出C#結構體——封裝乙太網心跳包的結構為例, 類與結構體性能對比測試——以封裝網路心跳包為例
- 最終將設備新增激活字典表(第一次心跳)或者在激活字典表刷新心跳時間(非第一次心跳)。
突然發現我可以寫一個物聯網的採集系統的系列了,組織一個目錄。希望自己堅持下去吧。
2.2.脫網清理框架
2.2.1.激活字典表清理脫網設備方法
原理很簡單,遍歷字典表中超過設置的檢測周期,篩選到一個字典的IEnumerable中去,然後在激活字典表中刪除對應超時key(這裡就是指IP地址)即可。當然這裡的_internal周期可以*N,多個周期,自行在配置文件中設置即可,配置文件如下:
"ipboxNumStaticInternal": 12
public static void DeleteDeadBoxFromActiveBox(in _internal)
{
{
var outTime = DateTime.Now.AddSeconds(-_internal);
var iboxTimeOutList = iboxActiveDictionary.Where(q => (outTime > q.Value.UpdateTime));//.Select(x=> iboxActiveDictionary[x.Key]) ;
foreach (var item in iboxTimeOutList)
{
iboxActiveDictionary.Remove(item.Key);
}
}
}
2.2.2.脫網清理流程圖
這裡主要開啟了一個系統定時器,主動會去調用清理脫網設備方法,調用時間間隔即ipboxNumStaticInternal。代碼如下:
public void systemTimerStart()
{
var interval = ReadTheInternalFromSetting();
_systemTimer = new Timer(state =>
{
IBoxActiveDicManager.DeleteDeadBoxFromActiveBo(_internal);
Console.WriteLine("{1},激活設備數量:{0}\n",IBoxActiveDicManager.iboxActiveDictionary.Count,DateTime.Now);
}, null, interval, interval);
Console.WriteLine("PemsCom採集系統時鐘已經開啟");
LoggerHelper.Info("PemsCom採集系統時鐘已經開啟");
}
/// <summary>
/// 配置文件讀入時間間隔方法
/// </summary>
/// <returns></returns>
private int ReadTheInternalFromSetting()
{
_internal = int.Parse(Appsettings.app(new string[] {"ipboxNumStaticInternal" }));
Console.WriteLine("PemsCom採集系統時鐘配置參數已經讀");
LoggerHelper.Info("PemsCom採集系統時鐘配置參數已經讀");
return Convert.ToInt32(TimeSpan.FromSecond(_internal).TotalMilliseconds);
}
3.多線程與高併發說明
3.1.多線程說明
這裡會有很多的線程讓CPU來輪片執行,比如:
- 12008個Receive事件觸發線程;
- 定時清除脫網設備線程;
- 主線程,監控命令行輸入,並執行對應的命令;
舉個實際的例子,以圖為證
12008台設備,每秒處理接受網路包的峰峰值是9218個包,就是在某一秒,CPU共輪片執行了9218個線程。比如是雙核4線程的,則9218/4=2304.5。即CPU在1秒輪片執行了2305次。即0.43毫秒就輪片執行一次。
3.2.高併發說明
其實3.1已經解釋了高併發。在某一秒,需要處理的接收事件有接近1萬件。而這一時刻的執行順序是無序的,9218里的這麼多線程,我們不知道哪個先執行,哪個後執行。如果不認為地加一些邏輯控制,比如我們今天要介紹的互斥鎖,就會出現一些異常現象。
4.多線程高併發造成的異常現象
這裡只描述現象,原因會在下麵5.分析異常原因 做具體描述。
4.1.空引用
異常所在的位置:心跳處理類如下。
public class HeartHandler
{
static string _deviceIndex = Appsettings.app(new string[] { "DeviceIndex" });
private static IBoxActive iboxActive;
public static void Register(TcpHeartPacket heartPacket,int sessId)
{
UInt32 IP;
UInt64 mac;
if (_deviceIndex == "IP")
{
IP =(UInt32)BitConverter.ToUInt32(heartPacket.IP, 0);
if (IBoxActiveDicManager.GetBoxActive(IP, out iboxActive) != true)
{
IBoxActiveDicManager.iboxActiveDictionary.TryAdd(IP, iboxActive);
iboxActive.SessID = sessId;
}
}
else
{
mac = (UInt64)BitConverter.ToUInt64(heartPacket.Mac, 0);
if (IBoxActiveDicManager.GetBoxActive(mac, out iboxActive) != true)
{
IBoxActiveDicManager.iboxActiveDictionary.TryAdd(mac, iboxActive);
iboxActive.SessID = sessId;
}
}
//引用類型,智能指針,使用方便
iboxActive.UpdateTime = DateTime.Now;
}
}
4.2.字典表裡元素賦值不成功
/// <summary>
/// 查詢激活設備箱字典中是否有存在上報的設備箱,
/// 存在返回true,不存在返回false,並且新建好設備箱模型
/// </summary>
/// <param name="mac"></param>
/// <param name="iboxActive"></param>
/// <returns></returns>
public static bool GetBoxActive(UInt32 IP, out IBoxActive iboxActive)
{
if (iboxActiveDictionary.TryGetValue(IP, outiboxActive))
{
return true;
}
iboxActive = new IBoxActive();
iboxActive.IP = IP;
if (iboxActive.IP != IP)
{
LoggerHelper.Error(string.Format("實例化賦值不成功.iboxActive.IP:{0};IP{1}", iboxActive.IP, IP));
}
return false;
}
有沒有感覺很奇怪,上一句都賦值了,下一句對比就不相等。但是在多線程大併發里就是有這種可能,下麵會詳細分析。
4.3.統計設備總數不正確
因為12008臺大併發時很容易出錯,所以改成了1000台。如下統計數據會有出錯情況,這同樣也是因為多線程高併發引起的錯誤。
5.分析異常原因
5.1.造成空引用的原因
其實第4的三點原因都是同一個原因造成,所以在5.1會詳細闡述,5.2,,5.3只做簡單闡述。這裡敲下黑板,分析多線程高併發的異常問題,程式運行的特點就是見縫就插,就像個老司機一樣,概括起來就是線程與線程之間的無序性。比如我們設備心跳線程正在更新設備心跳時間的時候。脫網清理線程就把該設備給清理掉了。如此一來,時間沒法賦值給空對象(已被脫網線程給清理)。因此只能報空引用異常,對沒錯,就是這麼簡單,耗費了我很長時間去debug跟思考這個異常。
5.2.設備IP賦值不成功原因
同樣,在創建了設備實例之後,IP賦值完成,剛好脫網清除設備線程運行清除了設備,當對比的時候,引用原來的地址,字典的原來地址已經存了其他設備箱的IP,所以IP地址不相等。
5.3.統計設備總數不正確原因
原因其實是5.2造成的,沒法成功註冊,當然數量就不對啦。
6.解決思路
就是當我在創建激活設備實例(第一次心跳註冊)或者更新心跳時間的時候(非第一次註冊),不要讓無序的脫網清除線程運行。敲黑板:就是保證心跳處理註冊過程的原子性。對,其實這裡很像關係型資料庫的事務,原子性。原子性就是對抗程式無序造成異常的有力武器。我們可以在註冊心跳處理方法上加個互斥鎖,讓編譯器跟運行時去安排更加合理的執行順序。
7.代碼實現
代碼很簡單。
//定義一把鎖
public static Mutex activeIpboxDicMutex = new Mutex();
//設備箱註冊加鎖。異常全部消除
IBoxActiveDicManager.activeIpboxDicMWaitOne();
HeartHandler.Register(tcpHeartPacsessionId);
IBoxActiveDicManager.activeIpboxDicMReleaseMutex();
這裡插入一下事務的使用,也是很類似的,把我們的主業務加中中間,類比方便大家理解記憶。就像夾心餅干(瞎扯)。
unitOfWork.BeginTransaction();
// Adds new device
unitOfWork.DeviceRepository.Add(device);
// Commit transaction
unitOfWork.Commit();
當然也可以給設備箱脫網清除線程加鎖。
IBoxActiveDicManager.activeIpboxDicMutex.WaitOne();
IBoxActiveDicManager.DeleteDeadBoxFromActiveBox(_internal);
IBoxActiveDicManager.activeIpboxDicMutex.ReleaseMutex();
考慮到脫網清除線程會損耗部分性能,我也測試了去掉該鎖的情況,也不會有第4的3個異常,至此問題全部解決。
8.小結
-
模擬設備數量小測不出這個問題,如此看出海量設備的重要性,因為現實情況肯定會出現以上三個問題,而且都是很嚴重很致命的問題。好的測試方法可以把問題扼殺在搖籃中;
-
多線程高併發時容易出現這樣那樣的異常,要懷著敬畏之心去思考,去解決問題;
版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://www.cnblogs.com/JerryMouseLi/p/12709048.html