基於C# Socket實現的簡單的Redis客戶端

来源:https://www.cnblogs.com/wucy/archive/2023/11/13/csharp_socket_redis_client.html
-Advertisement-
Play Games

前言 Redis是一款強大的高性能鍵值存儲資料庫,也是目前NOSQL中最流行比較流行的一款資料庫,它在廣泛的應用場景中扮演著至關重要的角色,包括但不限於緩存、消息隊列、會話存儲等。在本文中,我們將介紹如何基於C# Socket來實現一個簡單的Redis客戶端類RedisClient,來演示構建請求和 ...


前言

    Redis是一款強大的高性能鍵值存儲資料庫,也是目前NOSQL最流行比較流行的一款資料庫,它在廣泛的應用場景中扮演著至關重要的角色,包括但不限於緩存、消息隊列、會話存儲等。在本文中,我們將介紹如何基於C# Socket來實現一個簡單的Redis客戶端類RedisClient,來演示構建請求和輸出的相關通信機制。需要註意的是本文只是著重展示如何基於原生的Socket方式與Redis Server進行通信,並不是構建一個強大的Redis開發工具包

Redis簡介

    Redis(Remote Dictionary Server)是一個記憶體資料庫,它支持了非常豐富的數據結構,包括字元串、列表、集合、散列、有序集合等。Redis 提供了高性能的讀寫操作,可以用於緩存數據、消息隊列、分散式鎖、會話管理等多種用途。Redis 通常以鍵值對的方式存儲數據,每個鍵都與一個值相關聯,值的類型可以是字元串、列表、散列等。Redis不僅提供了豐富的命令集,用於操作存儲在資料庫中的數據,還提供了Redis serialization protocol (RESP) 協議來解析Redis Server返回的數據。相關的文檔地址如下所示:

Redis 命令指南

    Redis命令是與Redis伺服器進行通信的主要方式,通俗點就是發送指定格式的指令用於執行各種操作,包括數據存儲、檢索、修改和刪除等。以下是一些日常使用過程中常見的Redis命令及其用途:

  1. GET 和 SET 命令

    • GET key: 用於獲取指定鍵的值。
    • SET key value: 用於設置指定鍵的值.
  2. DEL 命令

    • DEL key: 用於刪除指定鍵.
  3. EXPIRE 和 TTL 命令

    • EXPIRE key seconds: 用於為指定鍵設置過期時間(秒).
    • TTL key: 用於獲取指定鍵的剩餘過期時間(秒).

    註意這裡的時間單位是秒

  4. INCR 和 DECR 命令

    • INCR key: 用於遞增指定鍵的值.
    • DECR key: 用於遞減指定鍵的值.
  5. RPUSH 和 LPOP 命令

    • RPUSH key value: 用於將值添加到列表的右側.
    • LPOP key: 用於從列表的左側彈出一個值.
  6. HSET 和 HGET 命令

    • HSET key field value: 用於設置哈希表中指定欄位的值.
    • HGET key field: 用於獲取哈希表中指定欄位的值.
  7. PUBLISH 和 SUBSCRIBE 命令

    • PUBLISH channel message: 用於向指定頻道發佈消息.
    • SUBSCRIBE channel: 用於訂閱指定頻道的消息.

當然 Redis 支持的命令遠不止這些,它還包括對集合、有序集合、點陣圖、HyperLogLog 等數據結構的操作,以及事務、Lua 腳本執行等高級功能。我們接下來演示的時候也只是展示幾個大家比較熟悉的指令,這也是我們學習新知識的時候經常使用的方式,先從最簡單最容易的開始入手,循序漸進,這也是微精通所提倡的方式。

Redis協議(RESP)

Redis Serialization Protocol (RESP) 是 Redis 使用的二進位協議,用於客戶端和伺服器之間的通信。我們可以通過該協議解析Redis伺服器返回的命令格式,解析我們想要的數據。RESP具有簡潔易解析的特點

  • 簡單字元串協議:

    • 格式: +OK\r\n
    • 第一個位元組是"+”,後跟消息內容,以"\r\n"(回車和換行)結束。
    • 示例:+OK\r\n
  • 批量字元串協議:

    • 格式: $5\r\nhello\r\n
    • 第一個位元組是"$",後跟字元串的位元組長度,然後是實際的字元串內容,最後以"\r\n"結束。
    • 示例:$5\r\nhello\r\n
  • 整數協議:

    • 格式: :42\r\n
    • 第一個位元組是":",後跟整數的文本表示,以"\r\n"結束。
    • 示例::42\r\n
  • 數組協議:

    • 格式: *3\r\n:1\r\n:2\r\n:3\r\n
    • 第一個位元組是"*",後跟數組中元素的數量,然後是數組中每個元素的 RESP 表示,以"\r\n"結束。
    • 示例:*3\r\n:1\r\n:2\r\n:3\r\n
  • 錯誤協議:

    • 格式: -Error message\r\n
    • 第一個位元組是"-",後跟錯誤消息內容,以"\r\n"結束。
    • 示例:-Error message\r\n

需要註意的是字元串協議裡面的長度不是具體字元的長度,而是對應的UTF8對應的位元組數組的長度,這一點對於我們解析返回的數據很重要,否則獲取數據的時候會影響數據的完整性。

RESP協議是Redis高效性能的關鍵之一,它相對比較加單,不需要解析各種頭信息等,這使得Redis能夠在處理大規模數據和請求時表現出色。瞭解RESP協議可以幫助您更好地理解Redis客戶端類 RedisClient 的內部工作原理。可以理解為它屬於一種應用層面的協議,通過給定的數據格式解析出想要的數據,這也對我們在實際編程過程中,解決類似的問題,提供了一個不錯的思路。

實現RedisClient

    上面我們介紹了一些關於Redis的基礎概念,重點介紹了一下關於Redis的命令和RESP,接下來我們就結合上面的理論,基於C# Socket來簡單的模擬一下如何和Redis Server進行數據交互。主要就是結合Redis命令Redis 協議(RESP)來簡單的實現。

通信架子

首先來看一下類的結構

public class RedisClient : IDisposable, IAsyncDisposable
{
    //定義預設埠
    private readonly int DefaultPort = 6379;
    //定義預設地址
    private readonly string Host = "localhost";
    //心跳間隔,單位為毫秒
    private readonly int HeartbeatInterval = 30000; 

    private bool _isConnected;
    //心跳定時器
    private Timer _heartbeatTimer;
    private Socket _socket;

    public RedisClient(string host = "localhost", int defaultPort = 6379)
    {
        Host = host;
        DefaultPort = defaultPort;

        // 初始化心跳定時器
        _heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
    }

    //連接方法
    public async Task ConnectAsync(int timeoutMilliseconds = 5000)
    {
        _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        var cts = new CancellationTokenSource(timeoutMilliseconds);
        await _socket.ConnectAsync(Host, DefaultPort, cts.Token);

        _isConnected = true;
    }

    //心跳方法
    private async void HeartbeatCallback(object state)
    {
        if (_isConnected)
        {
            var pingCommand = "PING\r\n";
            await SendCommandAsync(pingCommand);
        }
    }
    
    //釋放邏輯
    public void Dispose()
    {
        // 停止心跳定時器
        _heartbeatTimer.Dispose();

        if (_socket != null)
        {
            _socket.Shutdown(SocketShutdown.Both);
            _socket.Close();
        }
    }

    public ValueTask DisposeAsync()
    {
        Dispose();
        return ValueTask.CompletedTask;
    }
}

上面的類定義了實現的大致通信結構,結構中主要涉及到的是通信相關的功能實現,包含Socket的初始化信息、預設的連連接信息、心跳方法、釋放邏輯等。首先,在構造函數中,指定了預設的Redis埠(6379)、地址(localhost),並初始化了心跳定時器。連接方法ConnectAsync通過Socket建立與Redis伺服器的TCP連接。心跳定時器HeartbeatCallback定期發送PING命令,確保與伺服器的連接保持活動。最後,Dispose方法用於釋放資源,包括停止心跳定時器和關閉Socket連接,實現了IDisposableIAsyncDisposable介面。這些功能為RedisClient類提供了基本的連接和資源管理能力。由於我對Socket編程也不是很熟悉,所以定義的可能不是很完善,有比較熟悉的同學,可以多多指導。

發送和解析

有了這個基礎的架子之後,我們可以在裡面填寫具體的實現邏輯了。首先我們來定義發送Redis命令和解析RESP的邏輯

//發送命令
public async Task<string> SendCommandAsync(string command)
{
    // 發送命令的實現
    if (!_isConnected)
    {
        // 如果連接已斷開,可以進行重連
        await ConnectAsync();
    }
    
    //Redis的命令是以\r\n為結尾的
    var request = Encoding.UTF8.GetBytes(command + "\r\n");
    //發送命令
    await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None);

    var response = new StringBuilder();
    var remainingData = string.Empty;
    //初始化響應字元串和剩餘數據
    byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024);
    try
    {
        while (true)
        {
            //讀取返回信息
            var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
            //將接收到的數據添加到響應字元串
            var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
            //提取完整的響應並添加到響應字元串中
            var completeResponses = ExtractCompleteResponses(ref responseData);

            foreach (var completeResponse in completeResponses)
            {
                response.Append(completeResponse);
            }

            remainingData = responseData;
            //結果為\r\n讀取結束
            if (response.ToString().EndsWith("\r\n"))
            {
                break;
            }
        }
    }
    finally
    {
        //釋放緩衝區
        ArrayPool<byte>.Shared.Return(receiveBuffer);
    }

    //返回完整的響應字元串
    return response.ToString();
}

private List<string> ExtractCompleteResponses(ref string data)
{
    var completeResponses = new List<string>();

    while (true)
    {
        var index = data.IndexOf("\r\n");
        if (index >= 0)
        {
             // 提取一個完整的響應
            var completeResponse = data.Substring(0, index + 2);
            //將完整的響應添加到列表中
            completeResponses.Add(completeResponse);
            data = data.Substring(index + 2);
        }
        else
        {
            break;
        }
    }

    return completeResponses;
}

private string ParseResponse(string response)
{
    if (response.StartsWith("$"))
    {
        // 處理 Bulk Strings($)
        var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
        if (int.TryParse(lengthStr, out int length))
        {
            if (length == -1)
            {
                return null!;
            }

            string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
            byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
            string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
            return value;
        }
    }
    else if (response.StartsWith("+"))
    {
        // 處理 Simple Strings(+)
        return response.Substring(1, response.Length - 3);
    }
    else if (response.StartsWith(":"))
    {
        // 處理 Integers(:)
        var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
        if (int.TryParse(valueStr, out int value))
        {
            return value.ToString();
        }
    }

    // 如果響應格式不符合預期,拋出異常
    throw new InvalidOperationException(response);
}

上面邏輯涉及到發送和接收Redis消息的三個方法SendCommandAsyncExtractCompleteResponsesParseResponse。雖然上面代碼中有註釋,但是咱們分別I簡單的講解一下這三個方法

  • SendCommandAsync

    該方法主要目的是向 Redis 伺服器發送命令並非同步接收響應

    • 連接檢查:首先,檢查連接狀態 (_isConnected),如果連接已斷開,則調用 ConnectAsync 方法進行重連。
    • 命令轉換:將傳入的命令字元串轉換為 UTF-8 編碼的位元組數組,附加回車換行符 ("\r\n")。
    • 接收響應:使用非同步迴圈接收來自伺服器的響應。在每次接收之後,將接收到的數據添加到響應字元串中,並提取其中的完整響應。
    • 緩衝區管理:為了有效地處理接收到的數據,使用了一個緩衝區 (receiveBuffer),併在方法結束時通過 ArrayPool.Shared.Return 進行釋放。
    • 提取完整響應:調用 ExtractCompleteResponses 方法,該方法從響應數據中提取出一個或多個完整的響應,將其從數據中移除,並返回一個列表。
  • ExtractCompleteResponses

    該方法主要用於從接收到的數據中提取出一個或多個完整的響應。

    • completeResponses 列表:用於存儲提取出的完整響應的列表。
    • while 迴圈:迴圈進行以下操作,直到數據中沒有換行符為止。
    • 提取完整響應:如果找到換行符,就提取從數據開頭到換行符位置的子字元串,包括換行符本身,構成一個完整的響應。
    • 添加到列表:將提取出的完整響應添加到 completeResponses 列表中。
  • ParseResponse

    該方法主要用於解析從 Redis 伺服器接收到的響應字元串。

    • 如果響應以 $ 開頭,表示這是一個 Bulk String 類型的響應。
    • 如果響應以 + 開頭,表示這是一個 Simple String 類型的響應。
    • 如果響應以 : 開頭,表示這是一個 Integer 類型的響應。

簡單操作方法

上面有了和Redis通信的基本方法,也有瞭解析RESP協議的基礎方法,接下來咱們實現幾個簡單的Redis操作指令來展示一下Redis客戶端具體是如何工作的,簡單的幾個方法如下所示

//切換db操作
 public async Task SelectAsync(int dbIndex)
 {
     var command = $"SELECT {dbIndex}";
     await SendCommandAsync(command);
 }

//get操作
 public async Task<string> GetAsync(string key)
 {
     var command = $"GET {key}";
     return ParseResponse(await SendCommandAsync(command));
 }

//set操作
 public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null)
 {
     var command = $"SET {key} '{value}'";
     //判斷會否追加過期時間
     if (expiry.HasValue)
     {
         command += $" EX {expiry.Value.TotalSeconds}";
     }

     var response = ParseResponse(await SendCommandAsync(command));
     return response == "OK";
 }

 //支持過期時間的setnx操作
 public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null)
 {
    //因為預設的setnx方法不支持添加過期時間,為了保證操作的原子性,使用了lua
     var command = $"EVAL \"if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end\" 1 {key} '{value}'";

     if (expiry.HasValue)
     {
         command += $" {expiry.Value.TotalSeconds}";
     }

     var response = ParseResponse(await SendCommandAsync(command));
     return response == "1";
 }
 
 //添加支持函過期時間的list push操作
 public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null)
 {
     var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1])
                     if tonumber(ARGV[2]) > 0 then
                         redis.call('EXPIRE', KEYS[1], ARGV[2])
                     end
                     return len";

     var keys = new string[] { key };
     var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };

     var response = await ExecuteLuaScriptAsync(script, keys, args);

     return long.Parse(response);
 }

//list pop操作
 public async Task<string> ListPopAsync(string key)
 {
     var command = $"LPOP {key}";
     return ParseResponse(await SendCommandAsync(command));
 }

 //listrange操作
 public async Task<List<string>> ListRangeAsync(string key, int start, int end)
 {
     var command = $"LRANGE {key} {start} {end}";
     var response = await SendCommandAsync(command);

     if (response.StartsWith("*0\r\n"))
     {
         return new List<string>();
     }
     
     //由於list range返回了是一個數組,所以單獨處理了一下,這裡我使用了正則,解析字元串也可以,方法隨意
     var values = new List<string>();
     var pattern = @"\$\d+\r\n(.*?)\r\n";
     MatchCollection matches = Regex.Matches(response, pattern);

     foreach (Match match in matches)
     {
         values.Add(match.Groups[1].Value);
     }

     return values;
 }

 //執行lua腳本的方法
 public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
 {
    //去除lua里的換行
     script = Regex.Replace(script, @"[\r\n]", "");
     // 構建EVAL命令,將Lua腳本、keys和args發送到Redis伺服器
     var command = $"EVAL \"{script}\" { keys?.Length??0 } ";
     //拼接key和value參數
     if (keys != null && keys.Length != 0)
     {
         command += string.Join(" ", keys.Select(key => $"{key}"));
     }

     if (args != null && args.Length != 0)
     {
         command += " " + string.Join(" ", args.Select(arg => $"{arg}"));
     }

     return ParseResponse(await SendCommandAsync(command));
 }

 //redis發佈操作
 public async Task SubscribeAsync(string channel, Action<string, string> handler)
 {
     await SendCommandAsync($"SUBSCRIBE {channel}");

     while (true)
     {
         var response = await SendCommandAsync(string.Empty);
         string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
         Match match = Regex.Match(response, pattern);

         if (match.Success)
         {
             string ch = match.Groups[2].Value;
             string message = match.Groups[3].Value;

             handler(ch, message);
         }
     }
 }

//redis訂閱操作
 public async Task PublishAsync(string channel, string message)
 {
     await SendCommandAsync($"PUBLISH {channel} {message}");
 }

上面方法中演示了幾個比較常見的操作,很簡單,主要是向大家展示Redis命令是如何發送的,從最簡單的GETSETLIST發佈訂閱執行LUA操作方面著手,如果對Redis命令比較熟悉的話,操作起來還是比較簡單的,這裡給大家講解幾個比較有代表的方法

  • 首先關於setnx方法,由於自帶的setnx方法不支持添加過期時間,為了保證操作的原子性,使用了lua腳本的方式
  • 自帶的lpush也就是上面ListPushAsync方法中封裝的操作,自帶的也是沒辦法給定過期時間的,為了保證操作的原子性,我在這裡也是用lua進行封裝
  • 關於執行lua腳本的時候的時候需要註意lua腳本的格式EVAL script numkeys [key [key ...]] [arg [arg ...]]腳本後面緊跟著的長度是key的個數這個需要註意
  • 最後,自行編寫命令的時候需要註意\r\n的處理和引號的轉義問題,當然研究的越深,遇到的問題越多

相信大家也看到了,這裡我封裝的都是幾個簡單的操作,難度繫數不大,因為主要是向大家演示Redis客戶端的發送和接收操作是什麼樣的,甚至我都是直接返回的字元串,真實使用的時候我們使用都是需要封裝序列化和反序列化操作的。

完整代碼

上面分別對RedisClient類中的方法進行了講解,接下來我把我封裝的類完整的給大家貼出來,由於封裝的只是幾個簡單的方法用於演示,所以也只有一個類,代碼量也不多,主要是為了方便大家理解,有想試驗的同學可以直接拿走

public class RedisClient : IDisposable, IAsyncDisposable
{
    private readonly int DefaultPort = 6379;
    private readonly string Host = "localhost";
    private readonly int HeartbeatInterval = 30000; 

    private bool _isConnected;
    private Timer _heartbeatTimer;
    private Socket _socket;

    public RedisClient(string host = "localhost", int defaultPort = 6379)
    {
        Host = host;
        DefaultPort = defaultPort;

        _heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
    }

    public async Task ConnectAsync(int timeoutMilliseconds = 5000)
    {
        _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        var cts = new CancellationTokenSource(timeoutMilliseconds);
        await _socket.ConnectAsync(Host, DefaultPort, cts.Token);

        _isConnected = true;
    }

    public async Task SelectAsync(int dbIndex)
    {
        var command = $"SELECT {dbIndex}";
        await SendCommandAsync(command);
    }

    public async Task<string> GetAsync(string key)
    {
        var command = $"GET {key}";
        return ParseResponse(await SendCommandAsync(command));
    }

    public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null)
    {
        var command = $"SET {key} '{value}'";

        if (expiry.HasValue)
        {
            command += $" EX {expiry.Value.TotalSeconds}";
        }

        var response = ParseResponse(await SendCommandAsync(command));
        return response == "OK";
    }

    public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null)
    {
        var command = $"EVAL \"if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end\" 1 {key} '{value}'";

        if (expiry.HasValue)
        {
            command += $" {expiry.Value.TotalSeconds}";
        }

        var response = ParseResponse(await SendCommandAsync(command));
        return response == "1";
    }

    public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null)
    {
        var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1])
                        if tonumber(ARGV[2]) > 0 then
                            redis.call('EXPIRE', KEYS[1], ARGV[2])
                        end
                        return len";

        var keys = new string[] { key };
        var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };

        var response = await ExecuteLuaScriptAsync(script, keys, args);

        return long.Parse(response);
    }

    public async Task<string> ListPopAsync(string key)
    {
        var command = $"LPOP {key}";
        return ParseResponse(await SendCommandAsync(command));
    }

    public async Task<long> ListLengthAsync(string key)
    {
        var command = $"LLEN {key}";
        return long.Parse(ParseResponse(await SendCommandAsync(command)));
    }

    public async Task<List<string>> ListRangeAsync(string key, int start, int end)
    {
        var command = $"LRANGE {key} {start} {end}";
        var response = await SendCommandAsync(command);

        if (response.StartsWith("*0\r\n"))
        {
            return new List<string>();
        }

        var values = new List<string>();
        var pattern = @"\$\d+\r\n(.*?)\r\n";
        MatchCollection matches = Regex.Matches(response, pattern);

        foreach (Match match in matches)
        {
            values.Add(match.Groups[1].Value);
        }

        return values;
    }

    public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
    {
        script = Regex.Replace(script, @"[\r\n]", "");
        var command = $"EVAL \"{script}\" { keys?.Length??0 } ";
        if (keys != null && keys.Length != 0)
        {
            command += string.Join(" ", keys.Select(key => $"{key}"));
        }

        if (args != null && args.Length != 0)
        {
            command += " " + string.Join(" ", args.Select(arg => $"{arg}"));
        }

        return ParseResponse(await SendCommandAsync(command));
    }

    public async Task SubscribeAsync(string channel, Action<string, string> handler)
    {
        await SendCommandAsync($"SUBSCRIBE {channel}");

        while (true)
        {
            var response = await SendCommandAsync(string.Empty);
            string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
            Match match = Regex.Match(response, pattern);

            if (match.Success)
            {
                string ch = match.Groups[2].Value;
                string message = match.Groups[3].Value;

                handler(ch, message);
            }
        }
    }

    public async Task PublishAsync(string channel, string message)
    {
        await SendCommandAsync($"PUBLISH {channel} {message}");
    }

    public async Task<string> SendCommandAsync(string command)
    {
        if (!_isConnected)
        {
            await ConnectAsync();
        }

        var request = Encoding.UTF8.GetBytes(command + "\r\n");
        await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None);

        var response = new StringBuilder();
        var remainingData = string.Empty;

        byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024);
        try
        {
            while (true)
            {
                var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
                var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
                var completeResponses = ExtractCompleteResponses(ref responseData);

                foreach (var completeResponse in completeResponses)
                {
                    response.Append(completeResponse);
                }

                remainingData = responseData;

                if (response.ToString().EndsWith("\r\n"))
                {
                    break;
                }
            }
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(receiveBuffer);
        }

        return response.ToString();
    }

    private List<string> ExtractCompleteResponses(ref string data)
    {
        var completeResponses = new List<string>();

        while (true)
        {
            var index = data.IndexOf("\r\n");
            if (index >= 0)
            {
                var completeResponse = data.Substring(0, index + 2);
                completeResponses.Add(completeResponse);
                data = data.Substring(index + 2);
            }
            else
            {
                break;
            }
        }

        return completeResponses;
    }

    private string ParseResponse(string response)
    {
        if (response.StartsWith("$"))
        {
            var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
            if (int.TryParse(lengthStr, out int length))
            {
                if (length == -1)
                {
                    return null!;
                }

                string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
                byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
                string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
                return value;
            }
        }
        else if (response.StartsWith("+"))
        {
            return response.Substring(1, response.Length - 3);
        }
        else if (response.StartsWith(":"))
        {
            var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
            if (int.TryParse(valueStr, out int value))
            {
                return value.ToString();
            }
        }

        throw new InvalidOperationException(response);
    }

    private async void HeartbeatCallback(object state)
    {
        if (_isConnected)
        {
            var pingCommand = "PING\r\n";
            await SendCommandAsync(pingCommand);
        }
    }

    public void Dispose()
    {
        _heartbeatTimer.Dispose();

        if (_socket != null)
        {
            _socket.Shutdown(SocketShutdown.Both);
            _socket.Close();
        }
    }

    public ValueTask DisposeAsync()
    {
        Dispose();
        return ValueTask.CompletedTask;
    }
}

簡單使用RedisClient

上面我們封裝了RedisClient類,也講解了裡面實現的幾個簡單的方法,接下來我們就簡單的使用一下它,比較簡單直接上代碼

GET/SET

GET/SET是最基礎和最簡單的指令,沒啥可說的直接上代碼

using RedisClient redisClient = new RedisClient();
await redisClient.ConnectAsync();
//切換db
await redisClient.SelectAsync(3);

bool setResult = await redisClient.SetAsync("key:foo", "are you ok,你好嗎?", TimeSpan.FromSeconds(120));
string getResult = await redisClient.GetAsync("key:foo");
Console.WriteLine("get key:foo:" + getResult);

SETNX

SETNX比較常用,很多時候用在做分散式鎖的場景,判斷資源存不存在的時候經常使用

//第一次setnx返回true
bool setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
Console.WriteLine("first setnx order:lock:" + setNxResult);

//第一次setnx返回false
setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
Console.WriteLine("second setnx aname:foo:" + setNxResult);

PUB/SUB

這裡實現的SubscribeAsyncPublishAsync需要使用兩個RedisClient實例,因為我上面封裝的每個RedisClient只包含一個Socket實例所以ReceiveAsync方法是阻塞的。如果同一個實例的話SubscribeAsync的時候,在使用PublishAsync方法的時候會被阻塞,所以演示的時候使用了兩個RedisClient實例

_ = redisClient.SubscribeAsync("order_msg_ch", (ch, msg) => { Console.WriteLine($"接收消息:[{ch}]---[{msg}]"); });
Thread.Sleep(2000);

using RedisClient redisClient2 = new RedisClient();
await redisClient2.ConnectAsync();
for (int i = 0; i < 5; i++)
{
    await redisClient2.PublishAsync("order_msg_ch", $"發送消息{i}");
    Thread.Sleep(2000);
}

ExecuteLuaScriptAsync

動態執行lua的功能還是比較強大的,在之前的項目中,我也使用類似的功能。我們是模擬搶單/完成的場景,比如業務人員需要自行搶單,每個人最多搶幾單,超過閾值則搶單失敗,你需要把搶到的完成了才能繼續搶單,這種操作就需要藉助lua進行操作

//搶單的lua
string takeOrderLuaScript = @"
        local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0')
        if ordersTaken < tonumber(ARGV[1]) then
            redis.call('INCR', KEYS[1])
            return 1
        else
            return 0
        end";

//完成你手裡的訂單操作
string completeOrderLuaScript = @"
        local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0')
        if ordersTaken > 0 then
            redis.call('DECR', KEYS[1])
            return 1
        else
            return 0
        end";

//模擬搶單,最多搶兩單
string result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });

//完成訂單
string anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });

還有一個功能也是我們之前遇到的,就是使用Redis實現緩存最新的N條消息,舊的則被拋棄,實現這個功能也需要使用Redis的List結構結合lua的方式

string luaScript = @"
            local record_key = KEYS[1]
            local max_records = tonumber(ARGV[1])
            local new_record = ARGV[2]

            local current_count = redis.call('LLEN', record_key)

            if current_count >= max_records then
                redis.call('LPOP', record_key)
            end

            redis.call('RPUSH', record_key, new_record)
        ";

//這裡限制保存最新的50條數據,舊的數據則被拋棄
for (int i = 0; i < 60; i++)
{
    _ = await redisClient.ExecuteLuaScriptAsync(luaScript, keys: new[] { "msg:list" }, new[] { "50", i.ToString() });
}

List

LIST很多時候會把它當做分散式隊列來使用,它提供的操作也比較靈活,咱們這裡只是封裝了幾個最簡單的操作,大致的效果如下所示

//lis入隊操作
var res = await redisClient.ListPushAsync("list:2", "123", TimeSpan.FromHours(1));
res = await redisClient.ListPushAsync("list:2", "1234", TimeSpan.FromHours(1));
res = await redisClient.ListPushAsync("list:2", "12345", TimeSpan.FromHours(1));

//list出隊操作
var str = await redisClient.ListPopAsync("list:2");
//list長度
var length = await redisClient.ListLengthAsync("list:2");
//list range操作
var list = await redisClient.ListRangeAsync("article:list", 0, 10);

總結

    本文我們通過理解Redis命令RESP協議來構建了一個簡單RedisClient的實現,方便我們更容易的理解Redis客戶端如何與Redis伺服器進行通信,這個實現也可以作為學習和理解·Redis客戶端·的一個很好的例子。當然我們的這個RedisClient這是瞭解和學習使用,很多場景我們並沒有展示,實際的項目我們還是儘量使用開源的Redis SDK, .net中常用的有StackExchange.RedisFreeRediscsredisNewLife.RedisService.Stack.Redis,其中我經常使用的是StackExchange.RedisFreeRedis整體來說效果還是不錯的。總結一下我們文章的主要內容

  • 首先我們講解了Redis命令的格式
  • 其次我們講解了Redis協議(RESP)的主要格式以及如何解析
  • 然後我們基於上面的理論簡單的封裝了一個RedisClient類來演示相關概念
  • 最後我們通過幾個示例和我用過的兩個lua來簡單的演示RedisClient類的使用

    作為新時代的職場人,我樂在探究自己感興趣的領域,對未知的事物充滿好奇,並渴望深入瞭解。對於常用的核心技術,我不僅要求自己能夠熟練運用,更追求深入理解其實現原理。面對新的技術趨勢,我決不會視而不見,而是在熟悉更多常用技術棧的同時,努力深入掌握一些重要的知識。我堅信,學無止境,每一步的進步都帶來無比的喜悅與成就感。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在 Go 語言中,零值(Zero Value)是指在聲明變數但沒有顯式賦值的情況下,變數會被自動賦予一個預設值。這個預設值取決於變數的類型,不同類型的變數會有不同的零值。零值是 Go 語言中的一個重要概念,因為它確保了變數在聲明後具有一個可預測的初始狀態,減少了未初始化變數引發的問題。 以下是一些常 ...
  • 公眾號「架構成長指南」,專註於生產實踐、雲原生、分散式系統、大數據技術分享。 概述 在之前的教程中,我們看到了使用 RestTemplate 的 Spring Boot 微服務通信示例。 從 5.0 開始,RestTemplate處於維護模式,很快就會被棄用。因此 Spring 團隊建議使用org. ...
  • 大家好,我是棧長。 昨天 17 點多,棧長興緻來了,忙裡偷閑正在看了一把 LOL S13 淘汰賽,沒想到比賽還沒看完朋友圈就已經炸鍋了: 朋友圈有人開玩笑說,阿裡 35 歲的人是不是都被優化了?還是雙 11 後都鬆懈了?這大周末的還讓加班?讓不讓人省心點。。 這我看完也有點懵 B ,大家還記得上次的 ...
  • 眾所周知,mybatisplus提供了強大的代碼生成能力,他預設生成的常用的CRUD方法(例如插入、更新、刪除、查詢等)的定義,能夠幫助我們節省很多體力勞動 ...
  • Docker在服務端的應用中已經非常廣泛,所以服務端項目支持Docker將是必不可少的存在,此篇講述如何講一個Rust項目發佈到Docker的公共鏡像里,以供他人使用 ...
  • 1.HighLightingSystem 用於3D物體高亮顯示 在項目中的使用方法:導入插件後在需要高亮顯示的3d物體上附加Highlighter組件,在需要顯示高亮效果的攝像機上附加Highlighting Renderer組件。在代碼中調整Highlighter屬性即可控制物體高亮效果的開關、閃 ...
  • 一:背景 1. 講故事 前幾天有位朋友找到我,說他的窗體程式有卡死現象,讓我幫忙看下怎麼回事,解決這種問題就需要在卡死的時候抓一個dump下來,拿到dump之後就可以分析了。 二:為什麼會卡死 1. 觀察主線程 窗體程式的卡死,需要觀察主線程此時正在做什麼,可以用 !clrstack 命令觀察。 0 ...
  • 上篇文章講述了C#特性(Attribute)知識點,本文將介紹多線程的知識點。多線程編程是現代軟體開發中的重要組成部分,它能夠充分利用多核處理器,提高應用程式的性能和響應性。C#作為.NET開發的主要語言,提供了強大的多線程支持。本文將介紹C#多線程知識點,幫助.NET開發者更好地應對多線程編程。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...