微軟支持併發的Key-Value 存儲庫有C++與C#兩個版本。號稱迄今為止最快的併發鍵值存儲。下麵是C#版本翻譯: FASTER C#可在.NET Framework和.NET Core中運行,並且可以在單線程和併發設置中使用。經過測試,可以在Windows和Linux上使用。它公開了一種API, ...
微軟支持併發的Key-Value 存儲庫有C++與C#兩個版本。號稱迄今為止最快的併發鍵值存儲。下麵是C#版本翻譯:
FASTER C#可在.NET Framework和.NET Core中運行,並且可以在單線程和併發設置中使用。經過測試,可以在Windows和Linux上使用。它公開了一種API,該API可以執行讀取,盲更新(Upserts)和讀取-修改-寫入(RMW)操作的混合。它支持大於記憶體的數據,並接受IDevice將日誌存儲在文件中的實現。提供了IDevice本地文件系統的實現,也可以寫入遠程文件系統。或者將遠程存儲映射到本地文件系統中。FASTER可以用作傳統併發數據結構類似ConcurrentDictionary的高性能替代品,並且還支持大於記憶體的數據。它支持增量或非增量數據結構類型的檢查點。
FASTER支持三種基本操作:
- Read:從鍵值存儲中讀取數據
- Upsert:將值盲目向上插入到存儲中(不檢查先前的值)
- Read-Modify-Write:更新存儲區中的值,用於實現“求和”和“計數”之類的操作。
構建
在實例化FASTER之前,您需要創建FASTER將使用的存儲設備。如果使用的是可移植類型(byte、int、double)類型,則僅需要混合日誌設備。如果使用對象,則需要創建一個單獨的對象日誌設備。
IDevice log = Devices.CreateLogDevice("C:\\Temp\\hybridlog_native.log");
然後,按如下方式創建一個FASTER實例:
fht = new FasterKV<Key, Value, Input, Output, Empty, Functions>
(1L << 20, new Functions(), new LogSettings { LogDevice = log });
構造函數的類型參數
有六個基本概念,在實例化FASTER時作為通用類型參數提供:
- Key:這是鍵的類型,例如long。
- Value:這是存儲在FASTER中的值的類型。
- Input:這是調用Read或RMW時提供給FASTER的輸入類型。它可以被視為讀取或RMW操作的參數。例如,對於RMW,可是增量累加到值。
- Output:這是讀操作的輸出類型,將值的相關部分複製到輸出。
- Context:操作的用戶定義上下文,如果沒有必要使用Empty。
- Functions:需要回調時,使用IFunctions<>調用。
回調函數
用戶提供一個實例化IFunctions<>。此類型封裝了所有回調,下麵將對其進行介紹:
- SingleReader和併發讀ConcurrentReader:這些用於讀取存儲值並將它們複製到Output。單個讀取器可以假定沒有併發操作。
- SingleWriter和ConcurrentWriter:這些用於將值從源值寫入存儲。
- Completion callbacks完成回調:各種操作完成時調用。
- RMWUpdaters:用戶指定了三個更新器,InitialUpdater,InPlaceUpdater和CopyUpdater。它們一起用於實現RMW操作。
- Hash Table Siz哈希表大小:這是分配給FASTER的存儲行數,其中每個行為64位元組。
- LogSettings 日誌設置:這些設置與日誌的大小、設備。
- Checkpoint設置:這些是與檢查相關的設置,例如檢查類型和文件夾。
- Serialization序列化設置:用於為鍵和值類型提供自定義序列化程式。序列化程式實現IObjectSerializer<Key>鍵和IObjectSerializer<Value>值。只有C#類對象非可移植類型才需要這些。
- Key比較器:用於為key提供更好的比較器IFasterEqualityComparer<Key>。
構造函數參數
FASTER的總記憶體占用量由以下參數控制:
- 哈希表大小:此參數(第一個構造函數參數)乘以64是記憶體中哈希表的大小(以位元組為單位)。
- 日誌大小:logSettings.MemorySizeBits表示混合日誌的記憶體部分的大小(以位為單位)。換句話說對於參數設置B,日誌的大小為2 ^ B位元組。如果日誌指向類對象,則此大小不包括對象的大小,因為FASTER無法訪問此信息。日誌的較舊部分溢出到存儲中。
Sessions (Threads)會話(線程)
實例化FASTER之後,線程可以使用Session來使用FASTER
fht.StartSession();
fht.StopSession();
當所有線程都在FASTER上完成操作後,您最終銷毀FASTER實例:
fht.Dispose();
示例
以下是一個簡單示例,其中所有數據都在記憶體中,因此我們不必擔心掛起的I / O操作。在此示例中也沒有檢查點。
public static void Test()
{
var log = Devices.CreateLogDevice("C:\\Temp\\hlog.log");
var fht = new FasterKV<long, long, long, long, Empty, Funcs>
(1L << 20, new Funcs(), new LogSettings { LogDevice = log });
fht.StartSession();
long key = 1, value = 1, input = 10, output = 0;
fht.Upsert(ref key, ref value, Empty.Default, 0);
fht.Read(ref key, ref input, ref output, Empty.Default, 0);
Debug.Assert(output == value);
fht.RMW(ref key, ref input, Empty.Default, 0);
fht.RMW(ref key, ref input, Empty.Default, 0);
fht.Read(ref key, ref input, ref output, Empty.Default, 0);
Debug.Assert(output == value + 20);
fht.StopSession();
fht.Dispose();
log.Close();
}
此示例的函數:
public class Funcs : IFunctions<long, long, long, long, Empty>
{
public void SingleReader(ref long key, ref long input, ref long value, ref long dst) => dst = value;
public void SingleWriter(ref long key, ref long src, ref long dst) => dst = src;
public void ConcurrentReader(ref long key, ref long input, ref long value, ref long dst) => dst = value;
public void ConcurrentWriter(ref long key, ref long src, ref long dst) => dst = src;
public void InitialUpdater(ref long key, ref long input, ref long value) => value = input;
public void CopyUpdater(ref long key, ref long input, ref long oldv, ref long newv) => newv = oldv + input;
public void InPlaceUpdater(ref long key, ref long input, ref long value) => value += input;
public void UpsertCompletionCallback(ref long key, ref long value, Empty ctx) { }
public void ReadCompletionCallback(ref long key, ref long input, ref long output, Empty ctx, Status s) { }
public void RMWCompletionCallback(ref long key, ref long input, Empty ctx, Status s) { }
public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }
}
更多例子
檢查點和恢復
FASTER支持基於檢查點的恢復。每個新的檢查點都會保留(或使之持久)其他用戶操作(讀取,更新或RMW)。FASTER允許客戶端線程跟蹤已持久的操作和未使用基於會話的API的操作。
回想一下,每個FASTER線程都會啟動一個與唯一的Guid相關聯的會話。所有FASTER線程操作(讀取,Upsert,RMW)都帶有單調序列號。在任何時間點,都可以調用Checkpoint以啟動FASTER的非同步檢查點。在調用之後Checkpoint,(最終)向每個FASTER線程通知一個序列號,這樣可以確保直到該序列號之前的所有操作以及在該序列號之後沒有任何操作被保留為該檢查點的一部分。FASTER線程可以使用此序列號來清除等待執行的操作的任何記憶體緩衝區。
在恢復期間,線程可以使用繼續使用相同的Guid進行會話ContinueSession。該函數返回線程本地序列號,直到恢復該會話哈希為止。從那時起,新線程可以使用此信息來重播所有未提交的操作。
下麵一個單線程的簡單恢復示例。
public class PersistenceExample
{
private FasterKV<long, long, long, long, Empty, Funcs> fht;
private IDevice log;
public PersistenceExample()
{
log = Devices.CreateLogDevice("C:\\Temp\\hlog.log");
fht = new FasterKV<long, long, long, long, Empty, Funcs>
(1L << 20, new Funcs(), new LogSettings { LogDevice = log });
}
public void Run()
{
IssuePeriodicCheckpoints();
RunSession();
}
public void Continue()
{
fht.Recover();
IssuePeriodicCheckpoints();
ContinueSession();
}
/* Helper Functions */
private void RunSession()
{
Guid guid = fht.StartSession();
System.IO.File.WriteAllText(@"C:\\Temp\\session1.txt", guid.ToString());
long seq = 0; // sequence identifier
long key = 1, input = 10;
while(true)
{
key = (seq % 1L << 20);
fht.RMW(ref key, ref input, Empty.Default, seq);
seq++;
}
// fht.StopSession() - outside infinite loop
}
private void ContinueSession()
{
string guidText = System.IO.File.ReadAllText(@"C:\\Temp\session1.txt");
Guid sessionGuid = Guid.Parse(guidText);
long seq = fht.ContinueSession(sessionGuid); // recovered seq identifier
seq++;
long key = 1, input = 10;
while(true)
{
key = (seq % 1L << 20);
fht.RMW(ref key, ref input, Empty.Default, seq);
seq++;
}
}
private void IssuePeriodicCheckpoints()
{
var t = new Thread(() =>
{
while(true)
{
Thread.Sleep(10000);
fht.StartSession();
fht.TakeCheckpoint(out Guid token);
fht.CompleteCheckpoint(token, true);
fht.StopSession();
}
});
t.Start();
}
}
FASTER支持兩種檢查點概念:“快照”和“摺疊”。前者是將記憶體中的完整快照複製到一個單獨的快照文件中,而後者是自上一個檢查點以來更改的增量檢查點。摺疊有效地將混合日誌的只讀標記移到尾部,因此所有數據都作為同一混合日誌的一部分保留(沒有單獨的快照文件)。所有後續更新均寫入新的混合日誌尾部位置,這使Fold-Over具有增量性質。
項目路徑:
https://github.com/Microsoft/FASTER/tree/master/cs