引言 最近工作上有需要使用redis,於是便心血來潮打算自己寫一個C#客戶端。經過幾天的努力,目前該客戶端已經基本成型,下麵簡單介紹一下。 通信協議 要想自行實現redisClient,則必須先要瞭解Redis的socket能信協議。新版統一請求協議在 Redis 1.2 版本中引入, 並最終在 R ...
引言
最近工作上有需要使用redis,於是便心血來潮打算自己寫一個C#客戶端。經過幾天的努力,目前該客戶端已經基本成型,下麵簡單介紹一下。
通信協議
要想自行實現redisClient,則必須先要瞭解Redis的socket能信協議。新版統一請求協議在 Redis 1.2 版本中引入, 並最終在 Redis 2.0 版本成為 Redis 伺服器通信的標準方式。在這個協議中, 所有發送至 Redis 伺服器的參數都是二進位安全(binary safe)的。
以下是這個協議的一般形式:
*<參數數量> CR LF $<參數 1 的位元組數量> CR LF <參數 1 的數據> CR LF ... $<參數 N 的位元組數量> CR LF <參數 N 的數據> CR LF
註:命令本身也作為協議的其中一個參數來發送。舉個例子, 以下是一個命令協議的列印版本:
1 *3 2 $3 3 SET 4 $5 5 mykey 6 $7 7 myvalue
這個命令的實際協議值如下:
1 "*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"
稍後看到, 這種格式除了用作命令請求協議之外, 也用在命令的回覆協議中: 這種只有一個參數的回覆格式被稱為批量回覆(Bulk Reply)。統一協議請求原本是用在回覆協議中, 用於將列表的多個項返回給客戶端的, 這種回覆格式被稱為多條批量回覆(Multi Bulk Reply)。一個多條批量回覆以 *<argc>\r\n 為首碼, 後跟多條不同的批量回覆, 其中 argc 為這些批量回覆的數量。
Redis 命令會返回多種不同類型的回覆。一個狀態回覆(或者單行回覆,single line reply)是一段以 "+" 開始、 "\r\n" 結尾的單行字元串。通過檢查伺服器發回數據的第一個位元組, 可以確定這個回覆是什麼類型:
- 狀態回覆(status reply)的第一個位元組是 "+"
- 錯誤回覆(error reply)的第一個位元組是 "-"
- 整數回覆(integer reply)的第一個位元組是 ":"
- 批量回覆(bulk reply)的第一個位元組是 "$"
- 多條批量回覆(multi bulk reply)的第一個位元組是 "*"
.net Core Socket
說起socket,就不得不說IOCP了,這個方案本身就是為瞭解決多連接、高併發而設計的;但是話又說回來,任何方案都有局限性,不可能解決所有問題;這裡不去討論用在這裡是否合適,反正本人就是想這麼試一把:用一個簡單的ioc模式實現SAEA.Socket,併為此設定各種場景,反過來優化SAEA.Socket本身。下麵是一段伺服器接收連接的代碼:
1 private void ProcessAccept(SocketAsyncEventArgs args) 2 { 3 if (args == null) 4 { 5 args = new SocketAsyncEventArgs(); 6 args.Completed += ProcessAccepted; 7 } 8 else 9 { 10 args.AcceptSocket = null; 11 } 12 if (!_listener.AcceptAsync(args)) 13 { 14 ProcessAccepted(_listener, args); 15 } 16 }
項目結構
在網上找到redis的命令文檔後,本人覺的準備工作差不多了,可以初步定一下項目結構:
Core:定義的是redisclient相關最基本的業務
Interface:定義的是一些需要抽象出來的介面
Model:定義的是redis的數據模型及其請求、回覆的類型枚舉
Net:這裡就是將繼承實現SAEA.Socket而來的RedisConnection通信基礎
命令解碼器
通過前面的準備工作瞭解到redisClient的關鍵在於命令的編解碼,至於高大上演算法或redis官方演算法的實現,本人沒有去詳細瞭解,一衝動就自行實現了自定義版的解碼器。
1 public string Coder(RequestType commandName, params string[] @params) 2 { 3 _autoResetEvent.WaitOne(); 4 _commandName = commandName; 5 var sb = new StringBuilder(); 6 sb.AppendLine("*" + @params.Length); 7 foreach (var param in @params) 8 { 9 sb.AppendLine("$" + param.Length); 10 sb.AppendLine(param); 11 } 12 return sb.ToString(); 13 }
1 public ResponseData Decoder() 2 { 3 var result = new ResponseData(); 4 5 string command = null; 6 7 string error = null; 8 9 var len = 0; 10 11 switch (_commandName) 12 { 13 case RequestType.PING: 14 command = BlockDequeue(); 15 if (GetStatus(command, out error)) 16 { 17 result.Type = ResponseType.OK; 18 result.Data = "PONG"; 19 } 20 else 21 { 22 result.Type = ResponseType.Error; 23 result.Data = error; 24 } 25 break; 26 case RequestType.AUTH: 27 case RequestType.SELECT: 28 case RequestType.SLAVEOF: 29 case RequestType.SET: 30 case RequestType.DEL: 31 case RequestType.HSET: 32 case RequestType.HDEL: 33 case RequestType.LSET: 34 command = BlockDequeue(); 35 if (GetStatus(command, out error)) 36 { 37 result.Type = ResponseType.OK; 38 result.Data = "OK"; 39 } 40 else 41 { 42 result.Type = ResponseType.Error; 43 result.Data = error; 44 } 45 break; 46 case RequestType.TYPE: 47 command = BlockDequeue(); 48 if (GetStatusString(command, out string msg)) 49 { 50 result.Type = ResponseType.OK; 51 } 52 else 53 { 54 result.Type = ResponseType.Error; 55 } 56 result.Data = msg; 57 break; 58 case RequestType.GET: 59 case RequestType.GETSET: 60 case RequestType.HGET: 61 case RequestType.LPOP: 62 case RequestType.RPOP: 63 case RequestType.SRANDMEMBER: 64 case RequestType.SPOP: 65 len = GetWordsNum(BlockDequeue(), out error); 66 if (len == -1) 67 { 68 result.Type = ResponseType.Empty; 69 result.Data = error; 70 } 71 else 72 { 73 result.Type = ResponseType.String; 74 result.Data += BlockDequeue(); 75 } 76 break; 77 case RequestType.KEYS: 78 case RequestType.HKEYS: 79 case RequestType.LRANGE: 80 case RequestType.SMEMBERS: 81 result.Type = ResponseType.Lines; 82 var sb = new StringBuilder(); 83 var rn = GetRowNum(BlockDequeue(), out error); 84 if (!string.IsNullOrEmpty(error)) 85 { 86 result.Type = ResponseType.Error; 87 result.Data = error; 88 break; 89 } 90 //再嘗試讀取一次,發現有回車行出現 91 if (rn == -1) rn = GetRowNum(BlockDequeue(), out error); 92 if (!string.IsNullOrEmpty(error)) 93 { 94 result.Type = ResponseType.Error; 95 result.Data = error; 96 break; 97 } 98 if (rn > 0) 99 { 100 for (int i = 0; i < rn; i++) 101 { 102 len = GetWordsNum(BlockDequeue(), out error); 103 sb.AppendLine(BlockDequeue()); 104 } 105 } 106 result.Data = sb.ToString(); 107 break; 108 case RequestType.HGETALL: 109 case RequestType.ZRANGE: 110 case RequestType.ZREVRANGE: 111 result.Type = ResponseType.KeyValues; 112 sb = new StringBuilder(); 113 rn = GetRowNum(BlockDequeue(), out error); 114 if (!string.IsNullOrEmpty(error)) 115 { 116 result.Type = ResponseType.Error; 117 result.Data = error; 118 break; 119 } 120 if (rn > 0) 121 { 122 for (int i = 0; i < rn; i++) 123 { 124 len = GetWordsNum(BlockDequeue(), out error); 125 sb.AppendLine(BlockDequeue()); 126 } 127 } 128 result.Data = sb.ToString(); 129 break; 130 case RequestType.DBSIZE: 131 case RequestType.EXISTS: 132 case RequestType.EXPIRE: 133 case RequestType.PERSIST: 134 case RequestType.SETNX: 135 case RequestType.HEXISTS: 136 case RequestType.HLEN: 137 case RequestType.LLEN: 138 case RequestType.LPUSH: 139 case RequestType.RPUSH: 140 case RequestType.LREM: 141 case RequestType.SADD: 142 case RequestType.SCARD: 143 case RequestType.SISMEMBER: 144 case RequestType.SREM: 145 case RequestType.ZADD: 146 case RequestType.ZCARD: 147 case RequestType.ZCOUNT: 148 case RequestType.ZREM: 149 case RequestType.PUBLISH: 150 var val = GetValue(BlockDequeue(), out error); 151 if (!string.IsNullOrEmpty(error)) 152 { 153 result.Type = ResponseType.Error; 154 result.Data = error; 155 break; 156 } 157 if (val == 0) 158 { 159 result.Type = ResponseType.Empty; 160 } 161 else 162 { 163 result.Type = ResponseType.OK; 164 } 165 result.Data = val.ToString(); 166 break; 167 case RequestType.INFO: 168 var rnum = GetWordsNum(BlockDequeue(), out error); 169 if (!string.IsNullOrEmpty(error)) 170 { 171 result.Type = ResponseType.Error; 172 result.Data = error; 173 break; 174 } 175 var info = ""; 176 while (info.Length < rnum) 177 { 178 info += BlockDequeue(); 179 } 180 result.Type = ResponseType.String; 181 result.Data = info; 182 break; 183 case RequestType.SUBSCRIBE: 184 var r = ""; 185 while (IsSubed) 186 { 187 r = BlockDequeue(); 188 if (r == "message\r\n") 189 { 190 result.Type = ResponseType.Sub; 191 BlockDequeue(); 192 result.Data = BlockDequeue(); 193 BlockDequeue(); 194 result.Data += BlockDequeue(); 195 break; 196 } 197 } 198 break; 199 case RequestType.UNSUBSCRIBE: 200 var rNum = GetRowNum(BlockDequeue(), out error); 201 var wNum = GetWordsNum(BlockDequeue(), out error); 202 BlockDequeue(); 203 wNum = GetWordsNum(BlockDequeue(), out error); 204 var channel = BlockDequeue(); 205 var vNum = GetValue(BlockDequeue(), out error); 206 IsSubed = false; 207 break; 208 } 209 _autoResetEvent.Set(); 210 return result; 211 }
命令的封裝與測試
有了socket、redisCoder之後,現在就可以按照官方的redis命令來進行.net core的封裝了。本人將這些操作封裝到RedisClient、RedisDataBase兩個類中,然後又想到連接復用的問題,簡單實現了一個連接池RedisClientFactory的類。這樣一來就可以好好的來實驗一把,看看之前的設想最終能不能實現了:
1 /**************************************************************************** 2 *Copyright (c) 2018 Microsoft All Rights Reserved. 3 *CLR版本: 4.0.30319.42000 4 *機器名稱:WENLI-PC 5 *公司名稱:Microsoft 6 *命名空間:SAEA.RedisSocketTest 7 *文件名: Program 8 *版本號: V1.0.0.0 9 *唯一標識:3d4f939c-3fb9-40e9-a0e0-c7ec773539ae 10 *當前的用戶域:WENLI-PC 11 *創建人: yswenli 12 *電子郵箱:[email protected] 13 *創建時間:2018/3/17 10:37:15 14 *描述: 15 * 16 *===================================================================== 17 *修改標記 18 *修改時間:2018/3/19 10:37:15 19 *修改人: yswenli 20 *版本號: V1.0.0.0 21 *描述: 22 * 23 *****************************************************************************/ 24 using SAEA.Commom; 25 using SAEA.RedisSocket; 26 using System; 27 28 namespace SAEA.RedisSocketTest 29 { 30 class Program 31 { 32 static void Main(string[] args) 33 { 34 ConsoleHelper.Title = "SAEA.RedisSocketTest"; 35 ConsoleHelper.WriteLine("輸入ip:port連接RedisServer"); 36 37 var ipPort = ConsoleHelper.ReadLine(); 38 if (string.IsNullOrEmpty(ipPort)) 39 { 40 ipPort = "127.0.0.1:6379"; 41 } 42 RedisClient redisClient = new RedisClient(ipPort); 43 redisClient.Connect(); 44 //redisClient.Connect("wenli"); 45 46 47 var info = redisClient.Info(); 48 if (info.Contains("NOAUTH Authentication required.")) 49 { 50 while (true) 51 { 52 ConsoleHelper.WriteLine("請輸入redis連接密碼"); 53 var auth = ConsoleHelper.ReadLine(); 54 if (string.IsNullOrEmpty(auth)) 55 { 56 auth = "yswenli"; 57 } 58 var a = redisClient.Auth(auth); 59 if (a.Contains("OK")) 60 { 61 break; 62 } 63 else 64 { 65 ConsoleHelper.WriteLine(a); 66 } 67 } 68 } 69 70 //redisConnection.SlaveOf(); 71 72 //redisConnection.Ping(); 73 74 redisClient.Select(1); 75 76 //ConsoleHelper.WriteLine(redisConnection.Type("key0")); 77 78 ConsoleHelper.WriteLine("dbSize:{0}", redisClient.DBSize().ToString()); 79 80 81 RedisOperationTest(redisClient, true); 82 ConsoleHelper.ReadLine(); 83 } 84 85 private static void RedisOperationTest(object sender, bool status) 86 { 87 RedisClient redisClient = (RedisClient)sender; 88 if (status) 89 { 90 ConsoleHelper.WriteLine("連接redis伺服器成功!"); 91 92 #region key value 93 94 ConsoleHelper.WriteLine("回車開始kv插值操作..."); 95 ConsoleHelper.ReadLine(); 96 for (int i = 0; i < 1000; i++) 97 { 98 redisClient.GetDataBase().Set("key" + i, "val" + i); 99 } 100 //redisConnection.GetDataBase().Exists("key0"); 101 ConsoleHelper.WriteLine("kv插入完成..."); 102 103 ConsoleHelper.WriteLine("回車開始獲取kv值操作..."); 104 ConsoleHelper.ReadLine(); 105 106 var keys = redisClient.GetDataBase().Keys().Data.ToArray(false, "\r\n"); 107 108 foreach (var key in keys) 109 { 110 var val = redisClient.GetDataBase().Get(key); 111 ConsoleHelper.WriteLine("Get val:" + val); 112 } 113 ConsoleHelper.WriteLine("獲取kv值完成..."); 114 115 ConsoleHelper.WriteLine("回車開始開始kv移除操作..."); 116 ConsoleHelper.ReadLine(); 117 foreach (var key in keys) 118 { 119 redisClient.GetDataBase().Del(key); 120 } 121 ConsoleHelper.WriteLine("移除kv值完成..."); 122 #endregion 123 124 125 #region hashset 126 string hid = "wenli"; 127 128 ConsoleHelper.WriteLine("回車開始HashSet插值操作..."); 129 ConsoleHelper.ReadLine(); 130 for (int i = 0; i < 1000; i++) 131 { 132 redisClient.GetDataBase().HSet(hid, "key" + i, "val" + i); 133 } 134 ConsoleHelper.WriteLine("HashSet插值完成..."); 135 136 ConsoleHelper.WriteLine("回車開始HashSet插值操作..."); 137 ConsoleHelper.ReadLine(); 138 var hkeys = redisClient.GetDataBase().GetHKeys(hid).Data.ToArray(); 139 foreach (var hkey in hkeys) 140 { 141 var val = redisClient.GetDataBase().HGet(hid, hkey); 142 ConsoleHelper.WriteLine("HGet val:" + val.Data); 143 } 144 145 var hall = redisClient.GetDataBase().HGetAll("wenli"); 146 ConsoleHelper.WriteLine("HashSet查詢完成..."); 147 148 ConsoleHelper.WriteLine("回車開始HashSet移除操作..."); 149 ConsoleHelper.ReadLine(); 150 foreach (var hkey in hkeys) 151 { 152 redisClient.GetDataBase().HDel(hid, hkey); 153 } 154 ConsoleHelper.WriteLine("HashSet移除完成..."); 155 156 157 #endregion 158 159 160 //redisConnection.GetDataBase().Suscribe((c, m) => 161 //{ 162 // ConsoleHelper.WriteLine("channel:{0} msg:{1}", c, m); 163 // redisConnection.GetDataBase().UNSUBSCRIBE(c); 164 //}, "c39654"); 165 166 167 ConsoleHelper.WriteLine("測試完成!"); 168 } 169 else 170 { 171 ConsoleHelper.WriteLine("連接失敗!"); 172 } 173 } 174 } 175 }View Code
經過上面的代碼測試,使用redis-cli工具進行monitor命令監控發現——搞定了!另外源碼本人已發到github上面了,SAEA.RedisSocket的詳細可查看:https://github.com/yswenli/SAEA/tree/master/Src/SAEA.RedisSocket
轉載請標明本文來源:http://www.cnblogs.com/yswenli/p/8608661.html
更多內容歡迎star作者的github:https://github.com/yswenli/SAEA
如果發現本文有什麼問題和任何建議,也隨時歡迎交流~