什麼是Ring Buffer?顧名思義,就是一個記憶體環,每一次讀寫操作都迴圈利用這個記憶體環,從而避免頻繁分配和回收記憶體,減輕GC壓力,同時由於Ring Buffer可以實現為無鎖的隊列,從而整體上大幅提高系統性能。 ...
最近常收到SOD框架的朋友報告的SOD的SQL日誌功能報錯:文件句柄丟失。經過分析得知,這些朋友使用SOD框架開發了訪問量比較大的系統,由於忘記關閉SQL日誌功能所以出現了很高頻率的日誌寫入操作,從而偶然引起錯誤。後來我建議只記錄出錯的或者執行時間較長的SQL信息,暫時解決了此問題。但是作為一個熱心造輪子的人,一定要看看能不能造一個更好的輪子出來。
前面說的錯誤原因已經很直白了,就是頻繁的日誌寫入導致的,那麼解決方案就是將多次寫入操作合併成一次寫入操作,並且採用非同步寫入方式。要保存多次操作的內容就要有一個類似“隊列”的東西來保存,而一般的線程安全的隊列,都是“有鎖隊列”,在性能要求很高的系統中,不希望在日誌記錄這個地方耗費多一點計算資源,所以最好有一個“無鎖隊列”,因此最佳方案就是Ring Buffer(環形緩衝區)了。
什麼是Ring Buffer?顧名思義,就是一個記憶體環,每一次讀寫操作都迴圈利用這個記憶體環,從而避免頻繁分配和回收記憶體,減輕GC壓力,同時由於Ring Buffer可以實現為無鎖的隊列,從而整體上大幅提高系統性能。Ring Buffer的示意圖如下,有關具體原理,請參考此文《Ring Buffer 有什麼特別? 》。
上文並沒有詳細說明如何具體讀寫Ring Buffer,但是原理介紹已經足夠我們怎麼寫一個Ring Buffer程式了,接下來看看我在 .NET上的實現。
首先,定一個存放數據的數組,記住一定要用數組,它是實現Ring Buffer的關鍵並且CPU友好。
const int C_BUFFER_SIZE = 10;//寫入次數緩衝區大小,每次的實際內容大小不固定 string[] RingBuffer = new string[C_BUFFER_SIZE];
int writedTimes = 0;
變數writedTimes 記錄寫入次數,它會一直遞增,不過為了線程安全的遞增且不使用托管鎖,需要使用原子鎖Interlocked。之後,根據每次 writedTimes 跟環形緩衝區的大小求餘數,得到當前要寫入的數組位置:
void SaveFile(string fileName, string text) { int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; }
Ring Buffer的核心代碼就這麼點,調用此方法,會一直往緩衝區寫入數據而不會“溢出”,所以寫入Ring Buffer效率很高。
一個隊列如果只生產不消費肯定不行的,那麼如何及時消費Ring Buffer的數據呢?簡單的方案就是當Ring Buffer“寫滿”的時候一次性將數據“消費”掉。註意這裡的“寫滿”僅僅是指寫入位置 index達到了數組最大索引位置,而“消費”也不同於常見的堆棧,隊列等數據結構,只是讀取緩衝區的數據而不會移除它。
所以前面的代碼只需要稍加改造:
void SaveFile(string fileName, string text) { int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; if (writeP == 0 ) { string result = string.Concat( RingBuffer); FlushFile(fileName, result); } }
writeP == 0 表示當前一輪的緩衝區已經寫滿,然後調用函數 FlushFile 將Ring Buffer的數據連接起來,整體寫入文件。
void FlushFile(string fileName, string text) { using (FileStream fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.Write, 2048, FileOptions.Asynchronous)) { byte[] buffer = System.Text.Encoding.UTF8.GetBytes(text); IAsyncResult writeResult = fs.BeginWrite(buffer, 0, buffer.Length, (asyncResult) => { fs.EndWrite(asyncResult); }, fs); //fs.EndWrite(writeResult);//這種方法非同步起不到效果 fs.Flush(); } }
在函數 FlushFile 中我們使用了非同步寫入文件的技術,註意 FileOptions.Asynchronous ,使用它才可以真正利用Windows的完成埠IOCP,將文件非同步寫入。
當然這段代碼也可以使用.NET最新版本支持的 async/await ,不過我要讓SOD框架繼續支持.NET 2.0,所以只好這樣寫了。
現在,我們可以開多線程來測試這個迴圈隊列效果怎麼樣:
Task[] arrTask = new Task[20]; for (int i = 0; i < arrTask.Length; i++) { arrTask[i] = new Task(obj => SaveFile( (int)obj) ,i); } for (int i = 0; i < arrTask.Length; i++) { arrTask[i].Start(); } Task.WaitAll(arrTask); MessageBox.Show(arrTask.Length +" Task All OK.");
這裡開啟20個Task任務線程來寫入文件,運行此程式,發現20個線程才寫入了10條數據,分析很久才發現,文件非同步IO太快的話,會有緩衝區丟失,第一次寫入的10條數據無法寫入文件,多運行幾次就沒有問題了。所以還是得想法解決此問題。
通常情況下我們都是使用托管鎖來解決這種併發問題,但本文的目的就是要實現一個“無鎖環形緩衝區”,不能在此“功虧一簣”,所以此時“信號量”上場了。
同步可以分為鎖定和信號同步,信號同步機制中涉及的類型都繼承自抽象類WaitHandle,這些類型有EventWaitHandle(類型化為AutoResetEvent、ManualResetEvent)、Semaphore以及Mutex。見下圖:
首先聲明一個 ManualResetEvent對象:
ManualResetEvent ChangeEvent = new ManualResetEvent(true);
這裡我們將 ManualResetEvent 對象設置成 “終止狀態”,意味著程式一開始是允許所有線程不等待的,當我們需要消費Ring Buffer的時候再將 ManualResetEvent 設置成“非終止狀態”,阻塞其它線程。簡單說就是當要寫文件的時候將環形緩衝區阻塞,直到文件寫完才允許繼續寫入環形緩衝區。
對應的新的代碼調整如下:
void SaveFile(string fileName, string text) { ChangeEvent.WaitOne(); int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; if (writeP == 0 ) { ChangeEvent.Reset(); string result = string.Concat( RingBuffer); FlushFile(fileName, result); } }
然後,再FlushFile 方法的 回掉方法中,加入設置終止狀態的代碼,部分代碼如下:
(asyncResult) =>
{
fs.EndWrite(asyncResult);
ChangeEvent.Set();
}
OK,現在我們的程式具備高性能的安全的寫入日誌文件的功能了,我們來看看演示程式測試的日誌結果實例:
Arr[0]:Thread index:0--FFFFFFF Arr[1]:Thread index:1--FFFFFFF Arr[2]:Thread index:8--FFFFFFF Arr[3]:Thread index:9--FFFFFFF Arr[4]:Thread index:3--FFFFFFF Arr[5]:Thread index:2--FFFFFFF Arr[6]:Thread index:4--FFFFFFF Arr[7]:Thread index:10--FFFFFFF Arr[8]:Thread index:5--FFFFFFF Arr[9]:Thread index:6--FFFFFFF Arr[0]:Thread index:7--FFFFFFF Arr[1]:Thread index:11--FFFFFFF Arr[2]:Thread index:12--FFFFFFF Arr[3]:Thread index:13--FFFFFFF Arr[4]:Thread index:14--FFFFFFF Arr[5]:Thread index:15--FFFFFFF Arr[6]:Thread index:16--FFFFFFF Arr[7]:Thread index:17--FFFFFFF Arr[8]:Thread index:18--FFFFFFF Arr[9]:Thread index:19--FFFFFFF
測試結果符合預期!
到此,我們今天的主題就全部介紹完成了,不過要讓本文的代碼能夠符合實際的運行,還要解決每次只寫入少量數據並且將它定期寫入日誌文件的問題,這裡貼出真正的局部代碼:
PS:有朋友說採用信號量並不能完全保證程式安全,查閱了MSDN也說如果信號量狀態改變還沒有來得及應用,那麼是起不到作用的,所以還需要檢查業務狀態標記,也就是在設置非終止狀態後,馬上設置一個操作標記,在其它線程中,需要檢查此標記,以避免“漏網之魚”引起不期望的結果。
再具體實現上,我們可以實現一個“自旋鎖”,迴圈檢查此狀態標記,為了防止發生死鎖,還需要有鎖超時機制,代碼如下:
void SaveFile(string fileName, string text) { ChangeEvent.WaitOne(10000); int currP= Interlocked.Increment(ref WritedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; if (writeP == 0 ) { ChangeEvent.Reset(); IsReading = true; RingBuffer[index] = " Arr[" + index + "]:" + text; LastWriteTime = DateTime.Now; WritingIndex = 0; SaveFile(fileName,RingBuffer); } else if (DateTime.Now.Subtract(LastWriteTime).TotalSeconds > C_WRITE_TIMESPAN) { ChangeEvent.Reset(); IsReading = true; RingBuffer[index] = " Arr[" + index + "]:" + text; int length = index - WritingIndex + 1; if (length <= 0) length = 1; string[] newArr = new string[length]; Array.Copy(RingBuffer, WritingIndex, newArr, 0, length); LastWriteTime = DateTime.Now; WritingIndex = index + 1; SaveFile(fileName, newArr); } else { //防止漏網之魚的線程在信號量產生作用之前修改數據 //採用“自旋鎖”等待 int count = 0; while (IsReading) { if (count++ > 10000000) { Thread.Sleep(50); break; } } RingBuffer[index] = " Arr[" + index + "]:" + text; } }
完整的Ring Buffer代碼會在最新版本的SOD框架源碼中,有關本篇文章測試程式的完整源碼,請加QQ群討論獲取,
群號碼:SOD框架高級群 18215717 ,加群請註明 PDF.NET技術交流 ,否則可能被拒絕。