回到目錄 關於持久化到Redis的消息格式,主要是說在Broker上把消息持久化的過程中,需要存儲哪些類型的消息,因為我們的消息是分topic的,而每個topic又有若幹個queue組成,而我們的topic和queue由於redis存儲結構的原因,我們需要將它們分區對應存儲一下,而不能像關係型資料庫 ...
關於持久化到Redis的消息格式,主要是說在Broker上把消息持久化的過程中,需要存儲哪些類型的消息,因為我們的消息是分topic的,而每個topic又有若幹個queue組成,而我們的topic和queue由於redis存儲結構的原因,我們需要將它們分區對應存儲一下,而不能像關係型資料庫那樣靈活,所以要額外設計幾個數據結構來存儲它們。
一 Topic字典
二 Topic對應的Queue字典
三 Queue里的消息
四 某個客戶端對應某個Queue的消費進度
以上四個結構是我們要說的,它們會在推消息,拉消息,刪消息時用到,下麵一一介紹一下,講的不好不對的地方,歡迎大家為大叔留言。
一 Topic字典
主要存儲每個topic,它是一個set集合,redis的我集合類型之一,每個key是唯一的LindMq_Topic,值value就是我們客戶端傳來的具體topic的名字,這主要是在刪除過期的消息時用的,主是作用是遍歷所有的topic消息類型,這樣我們在刪除消息時,就可以把所有註冊的topic都找到了,最後把過期的刪除,預設消息存活周期是一天。
刪除過期的消息代碼如下
var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY); foreach (var topic in topicList) { var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic); foreach (var queue in queueList) { var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd"); RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey); } }
二 Topic對應的Queue字典
我們知道,為了加大redis的併發量和吞吐量,我們會把大數據鍵值對設計成多個鍵,這就像是一個集群環境的sharing,就是將大數據進行分片,而我們的分片規則是採用按對象取模的方式,模數可以自己設置,比較我設置8,那說明我的隊列(分片)最多可以被分為8個,這個大家可以去做測試,挺有意思的,比隨機數來個直接!而這一次redis里的鍵就是某個topic,而值就是我們的topic加上隊列索引,例如你的topic是zzl,那麼隊列里的鍵可能就是zzl0,zzl1,zzl2...
三 Queue里的消息
我們的生產者將消息發送到broker里,然後於broker將消息持久化到具體的存儲介質里,當然這裡我們用的是Redis,在存儲在redis里時,我們的具體隊列的鍵是有尾碼的,這主要用於消息的回收,因為我們打算1天回收一次消息,所以我們的消息尾碼是個日期變數,當然精確到天就可以了,它可以是這樣鍵名LindMQ_order_Paid4_20161202,每個隊列都有自己的尾碼,我們在清除消息時也就有了方法了。我們的隊列存儲結構是比較特殊的sortedSet ,就是可排序的集合,它有權重的概念,我們剛好可以使用這個特性來記錄客戶端的消費進度,因為我們的權重值在一個redis鍵/值對里是唯一的。
下麵代碼選自Push入隊列的代碼片斷,分享給大家
//存儲當前Topic RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic); //要存儲到哪個隊列 body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT); var dataKey = body.Topic + body.QueueId; RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey); //記錄偏移 var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey)); body.QueueOffset = offset + 1; //存儲消息 RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd( GetRedisDataKey(dataKey), Utils.SerializeMemoryHelper.SerializeToJson(body), score: body.QueueOffset);
四 某個客戶端對應某個Queue的消費進度
消費進度是一個很麻煩的問題,生產者的消息是可以被多個消費者消費的,所以不能使用.net那種簡單的Queue機制,出隊列後就消失了,這是不靠譜的,萬一消失失敗了,也會造成消息的丟失!下麵我們主要看一下消費進度的存儲,它是一個Hash集合,其中redis的鍵名是LindMQ_ConsumerOffset,而value是一個hash對象,hash里的key是當前隊列名+消費者IP地址的hashcode值,hash里的value是這個消費者(客戶端)的消費進度(Queue里的權重,Queue的存儲結構是一個sortedSet)。
客戶端消費的測試代碼
#region Client-LindMQ var consumer = new ConsumerSetting { BrokenName = "test", BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406), Callback = new Dictionary<string, Action<MessageBody>>() { {"zzl",(o)=>{ Console.WriteLine(o.ToString()); Thread.Sleep(1000); }}, {"zhz",(o)=>{ Console.WriteLine(o.ToString()); Thread.Sleep(2000); }} } }; var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer }); consumerClient.Start(); #endregion
客戶端消費的測試結果
好了,到這裡我們的LindMQ里數據存儲結構的內容就講完了,主要使用了redis里的set,sortedSet,hash等數據結構,在設計過程中,使用了分片(Sharing)的概念,當然也是借鑒了mongodb和redis集群的設計理念,同時借鑒了方雪華老兄的EQueue設計理念,在這裡和他們說一聲:謝謝!
感謝各位對Lind的支持!