Lind.DDD.LindMQ~關於持久化到Redis的消息格式

来源:http://www.cnblogs.com/lori/archive/2016/12/02/6125690.html
-Advertisement-
Play Games

回到目錄 關於持久化到Redis的消息格式,主要是說在Broker上把消息持久化的過程中,需要存儲哪些類型的消息,因為我們的消息是分topic的,而每個topic又有若幹個queue組成,而我們的topic和queue由於redis存儲結構的原因,我們需要將它們分區對應存儲一下,而不能像關係型資料庫 ...


回到目錄

關於持久化到Redis的消息格式,主要是說在Broker上把消息持久化的過程中,需要存儲哪些類型的消息,因為我們的消息是分topic的,而每個topic又有若幹個queue組成,而我們的topic和queue由於redis存儲結構的原因,我們需要將它們分區對應存儲一下,而不能像關係型資料庫那樣靈活,所以要額外設計幾個數據結構來存儲它們。

一 Topic字典

二 Topic對應的Queue字典

三 Queue里的消息

四 某個客戶端對應某個Queue的消費進度

以上四個結構是我們要說的,它們會在推消息,拉消息,刪消息時用到,下麵一一介紹一下,講的不好不對的地方,歡迎大家為大叔留言。

一 Topic字典

主要存儲每個topic,它是一個set集合,redis的我集合類型之一,每個key是唯一的LindMq_Topic,值value就是我們客戶端傳來的具體topic的名字,這主要是在刪除過期的消息時用的,主是作用是遍歷所有的topic消息類型,這樣我們在刪除消息時,就可以把所有註冊的topic都找到了,最後把過期的刪除,預設消息存活周期是一天。

刪除過期的消息代碼如下

 var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY);
  foreach (var topic in topicList)
   {
     var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic);
         foreach (var queue in queueList)
           {
            var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd");
              RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey);
            }
    }

二 Topic對應的Queue字典

我們知道,為了加大redis的併發量和吞吐量,我們會把大數據鍵值對設計成多個鍵,這就像是一個集群環境的sharing,就是將大數據進行分片,而我們的分片規則是採用按對象取模的方式,模數可以自己設置,比較我設置8,那說明我的隊列(分片)最多可以被分為8個,這個大家可以去做測試,挺有意思的,比隨機數來個直接!而這一次redis里的鍵就是某個topic,而值就是我們的topic加上隊列索引,例如你的topic是zzl,那麼隊列里的鍵可能就是zzl0,zzl1,zzl2...

三 Queue里的消息

我們的生產者將消息發送到broker里,然後於broker將消息持久化到具體的存儲介質里,當然這裡我們用的是Redis,在存儲在redis里時,我們的具體隊列的鍵是有尾碼的,這主要用於消息的回收,因為我們打算1天回收一次消息,所以我們的消息尾碼是個日期變數,當然精確到天就可以了,它可以是這樣鍵名LindMQ_order_Paid4_20161202,每個隊列都有自己的尾碼,我們在清除消息時也就有了方法了。我們的隊列存儲結構是比較特殊的sortedSet ,就是可排序的集合,它有權重的概念,我們剛好可以使用這個特性來記錄客戶端的消費進度,因為我們的權重值在一個redis鍵/值對里是唯一的。

下麵代碼選自Push入隊列的代碼片斷,分享給大家

       //存儲當前Topic
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic);

            //要存儲到哪個隊列
            body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT);
            var dataKey = body.Topic + body.QueueId;
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey);

            //記錄偏移
            var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey));
            body.QueueOffset = offset + 1;

            //存儲消息
            RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd(
                GetRedisDataKey(dataKey),
                Utils.SerializeMemoryHelper.SerializeToJson(body),
                score: body.QueueOffset);

四 某個客戶端對應某個Queue的消費進度

消費進度是一個很麻煩的問題,生產者的消息是可以被多個消費者消費的,所以不能使用.net那種簡單的Queue機制,出隊列後就消失了,這是不靠譜的,萬一消失失敗了,也會造成消息的丟失!下麵我們主要看一下消費進度的存儲,它是一個Hash集合,其中redis的鍵名是LindMQ_ConsumerOffset,而value是一個hash對象,hash里的key是當前隊列名+消費者IP地址的hashcode值,hash里的value是這個消費者(客戶端)的消費進度(Queue里的權重,Queue的存儲結構是一個sortedSet)。

客戶端消費的測試代碼

            #region Client-LindMQ
            var consumer = new ConsumerSetting
            {
                BrokenName = "test",
                BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406),
                Callback = new Dictionary<string, Action<MessageBody>>() { 
                {"zzl",(o)=>{
                    Console.WriteLine(o.ToString());
                    Thread.Sleep(1000);
                }},
                {"zhz",(o)=>{
                    Console.WriteLine(o.ToString());
                    Thread.Sleep(2000);
                }}
                }
            };
            var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer });
            consumerClient.Start();
            #endregion

客戶端消費的測試結果

好了,到這裡我們的LindMQ里數據存儲結構的內容就講完了,主要使用了redis里的set,sortedSet,hash等數據結構,在設計過程中,使用了分片(Sharing)的概念,當然也是借鑒了mongodb和redis集群的設計理念,同時借鑒了方雪華老兄的EQueue設計理念,在這裡和他們說一聲:謝謝!

感謝各位對Lind的支持!

回到目錄

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.簡化Java開發 Spring是一個開源框架,它的根本使命在於簡化java開發。為了降低java開發的複雜性,Spring採取了以下4種關鍵策略: 1.基於POJO的輕量級和最小侵入性編程; 有很多框架強迫應用繼承它們的類或實現它們的介面從而導致應用與框架綁死,而基於Spring構建的應用通常沒 ...
  • STL的pair,有兩個值,可以是不同的類型。 template struct pair; 註意,pair在頭文件utility中,不要include。(一個錯誤是 include ) 成員類型 first_type first的類型 second_type second的類型 成員變數 first... ...
  • 解釋如下: content 中需要被替換的就是{}中的參數,array數組中存放的是對應的要替換的參數;使用MessageFormat方法的時候,需要要將這些參數的個數匹配正確,並且數序要指定,否則匹配出錯。這樣就實現了參數的替換。很簡單,也很死板。 MessageFormat:出自java.tex ...
  • 在所有編程語言領域,我想字元串應該是地球上最常用的表達手段了吧。 在java的世界里,String是作為類出現的,核心的一個域就是一個char數組,內部就是通過維護一個不可變的char數組,來向外部輸出的。 這是jdk一段String類定義,首先類是final,表明類不可被繼承;核心域是privat ...
  • 1.package 的用途,解決了什麼問題 提供類的命名空間,解決類的命名衝突,類文件管理問題 2.使用舉例 2.1 自測代碼 (1) package 必須做為源文件的第一條非註釋語句 (2) 一個源文件只能有一個包 (3) 沒有顯示指定則處於預設包下 (4) 同包下可自由訪問 1 package ...
  • 前言 項目框架主要是spring,持久層框架沒有用mybtis,用的是spring 的jdbc; 業務需求:給應用添加領域(一個領域包含多個應用,一個應用可能屬於多個領域,一般而言一個應用只屬於一個領域),要求是給應用添加領域的時候,先將該應用已有的領域都刪除,之後再將選中的領域添加到資料庫; 為了 ...
  • 雲計算下PAAS的解析一 PaaS是Platform-as-a-Service的縮寫,意思是平臺即服務。 把伺服器平臺作為一種服務提供的商業模式。通過網路進行程式提供的服務稱之為SaaS(Software as a Service),而雲計算時代相應的伺服器平臺或者開發環境作為服務進行提供就成為了P... ...
  • Nginx ("engine x") 是一個高性能的HTTP和反向代理伺服器,也是一個IMAP/POP3/SMTP伺服器。Nginx是由Igor Sysoev為俄羅斯訪問量第二的Rambler.ru站點開發的,第一個公開版本0.1.0發佈於2004年10月4日。其將源代碼以類BSD許可證的形式發佈, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...