c#通過Redis實現輕量級消息組件

来源:https://www.cnblogs.com/yanpeng19940119/archive/2019/09/28/11603865.html
-Advertisement-
Play Games

最近在開發一個輕量級ASP.NET MVC開發框架,需要加入日誌記錄,郵件發送,簡訊發送等功能,為了保持模塊的獨立性,所以需要通過消息通信的方式進行處理,為了保持框架在部署,使用,二次開發過程中的簡易便捷性,所以沒有選擇傳統的MQ,而是基於Redis的訂閱發佈實現一個系統內部消息組件,話不多說,上碼 ...


最近在開發一個輕量級ASP.NET MVC開發框架,需要加入日誌記錄,郵件發送,簡訊發送等功能,為了保持模塊的獨立性,所以需要通過消息通信的方式進行處理,為了保持框架在部署,使用,二次開發過程中的簡易便捷性,所以沒有選擇傳統的MQ,而是基於Redis的訂閱發佈實現一個系統內部消息組件,話不多說,上碼!

數據結構定義

消息實體包含幾個部分,訂閱通道名稱,信息頭,信息體,信息差異化額外信息字典,信息頭主要包含消息標識,消息日期,信息體包含信息內容,信息實體類型等

   public class Message
    {
        public string MessageChannel { set; get; }
        public MessageHead @MessageHead { set; get; }
        public MessageBody @MessageBody { set; get; }

        [JsonExtensionData]
        public Dictionary<string,Object> @MessageExtra { set; get; }

        public Message()
        {

        }

        public void AddExtra(string Name, string Value)
        {
            if (@MessageExtra == null)
            {
                @MessageExtra = new Dictionary<string, object>();
            }
            @MessageExtra.Add(Name, Value);
        }

        public Object GetExtra(string Name)
        {
            return @MessageExtra[Name];
        }
    }

    public class MessageHead
    {
        public string MessageID { set; get; }
        public DateTime MessageDate { set; get; }

        public MessageHead()
        {
            MessageID = CommonUtil.CreateCommonGuid();
            MessageDate = DateTime.Now;
        }
    }

    public class MessageBody
    {
        public string MessageJsonContent { set; get; }
        public Type MessageMapperType { set; get; }
    }

註:因為消息訂閱發佈傳遞過程中,我是通過Json序列化傳輸的,使用過程中可能需要一些額外的鍵值對信息,這裡在對象中定義的是Dictinary對象,但是Dictinary本身是不支持序列化的,所以需要加上註解JsonExtensionData

訂閱通道聲明

我們需要達到的效果是,在系統啟動時,所有消息通道可以根據系統中的應用自動訂閱,這裡就需要一個註解來標識我們的訂閱通道接收消息的實現類

[AttributeUsage(AttributeTargets.Class)]
    public class MessageChanelAttribute : Attribute
    {
        private string _ChannleName;
        public string ChannelName
        {
            get
            {
                return this._ChannleName;
            }
            set
            {
                this._ChannleName = value;
            }

        }
    }

消息的個性化策略處理

Redis的三方庫我這裡使用的是StackExchange.Redis.dll,在消息訂閱時,需要為Channel指定接收到消息時的處理委托,我們在自動訂閱的過程中肯定也要收集好各類消息處理類並與Channel一一對應,這時候我們就需要一個基類FastDefaultMessageHandler,我們的具體的消息處理類繼承自FastDefaultMessageHandler,重寫處理方法即可

 [Component]
    [MessageChanelAttribute(ChannelName = "DefaultMessage")]
    public class FastDefaultMessageHandler : IFastMessageHandle
    {
        [AutoWired]
        public DBUtil @DBUtil;

        public void HandleMessage(RedisChannel ChannelName, RedisValue Message)
        {
            FastExecutor.Message.Design.Message Entity = JsonConvert.DeserializeObject<FastExecutor.Message.Design.Message>(Message);
            try
            {
                if (!CheckMessageIsConsume(Entity))
                {
                    this.CustomHandle(Entity);
                }
            }
            catch (Exception e)
            {
                StringBuilder ExceptionLog = new StringBuilder();
                ExceptionLog.AppendFormat("異常Message所屬Channel:{0}", Entity.MessageChannel + Environment.NewLine);
                ExceptionLog.AppendFormat("異常Message插入時間:{0}", Entity.MessageHead.MessageDate.ToString() + Environment.NewLine);
                ExceptionLog.AppendFormat("異常Message內容:{0}", Message + Environment.NewLine);
                ExceptionLog.AppendFormat("異常信息:{0}", e.Message + Environment.NewLine);
                LogUtil.WriteLog("Logs/MessageErrorLog", "log_", ExceptionLog.ToString() + Environment.NewLine);
                ExceptionLog.AppendFormat("========================================================================================================================================================================" + Environment.NewLine);
                MessageACK.MoveMessageToExceptionChannel(Entity.MessageChannel, Entity);
            }
            finally
            {
                MessageACK.ConfirmMessageFinish(Entity.MessageChannel, Entity.MessageHead.MessageID);
            }

        }

        public virtual void CustomHandle(FastExecutor.Message.Design.Message @Message)
        {

        }

        public virtual bool CheckMessageIsConsume(FastExecutor.Message.Design.Message @Message)
        {
            return false;
        }
    }

其中的HandleMessage方法就是我們在訂閱Channel時對應的委托,會調用類中的CustomHandle的虛方法,子類繼承重寫該方法就會基於多態進行策略調用,CheckMessageIsConsume方法是用於確認消息是否重覆消費的,也可以被重寫,下麵看一個訪問日誌類的實例,使用MessageChanelAttribute標註聲明該實現類需要訂閱發佈的Channel名稱為Visit,CustomHandle方法中實現了插入資料庫操作,CheckMessageIsConsume方法判斷該條日誌數據是否已消費(已經存在於資料庫)

    [MessageChanelAttribute(ChannelName = "Visit")]
    public class VisitLog : FastDefaultMessageHandler
    {
        public override void CustomHandle(Message.Design.Message Message)
        {
            Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
            @DBUtil.Insert(LogEntity);
            base.CustomHandle(Message);
        }

        public override bool CheckMessageIsConsume(Message.Design.Message Message)
        {
            Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
            DBRow Row = new DBRow("Frame_VisitLog", "RowGuid", LogEntity.RowGuid);
            if (Row.IsExist())
            {
                return true;
            }
            else
            {
                return false;
            }
        }
    }

消息自動訂閱

我們希望系統在啟動時就尋找出定義好Channel和實現類,自動實現訂閱,這裡就需要用到IOC容器,啟動系統時將所有的消息處理類放入容器中,在自動訂閱時全部取出來,根據消息處理類中聲明的Channel名稱進行自動訂閱

  public void Init()
        {
            List<Type> HandlerTypeList = InjectUtil.Container.GetRegistType(typeof(IFastMessageHandle));
            foreach (Type HandlerType in HandlerTypeList)
            {
                MessageChanelAttribute Channel = Attribute.GetCustomAttribute(HandlerType, typeof(MessageChanelAttribute)) as MessageChanelAttribute;
                RedisUtil.Subscribe(Channel.ChannelName, ((FastDefaultMessageHandler)InjectUtil.Container.Resolve(HandlerType)).HandleMessage);
            }
        }

註:

1.這裡的IOC容器是我自己實現的,地址:https://gitee.com/grassprogramming/FastIOC,大家可以用AutoFac代替

2.RedisUtil是對StackExchange.Redis.dll封裝的處理類,地址:https://gitee.com/grassprogramming/FastUtil

消息發送

消息只需要調用Redis的發佈方法即可,將Channel名稱與定義好的數據實體類傳入,序列化為Json

     public void SendMessage<T>(string ChannleName, T CustomMessageEntity, Dictionary<string, string> ExtraData = null)
        {
            FastExecutor.Message.Design.Message MessageEntity = new Design.Message();
            MessageEntity.MessageChannel = ChannleName;
            MessageHead Head = new MessageHead();
            MessageBody Body = new MessageBody();
            Body.MessageMapperType = typeof(T);
            Body.MessageJsonContent = JsonConvert.SerializeObject(CustomMessageEntity);
            MessageEntity.MessageHead = Head;
            MessageEntity.MessageBody = Body;
            if (ExtraData != null)
            {
                foreach (var item in ExtraData)
                {
                    MessageEntity.AddExtra(item.Key, item.Value);
                }
            }
            RedisUtil.Publish(ChannleName, MessageEntity);
            MessageACK.CopyMessageToACKList(ChannleName, MessageEntity);
        }

消息確認與存儲

Redis作訂閱發佈模式作為消息組件的問題有兩方面

問題:消息消費完沒有確認機制

解決方案

基於Redis的Hash存儲方式建立一個消息存儲欄位,在發送消息時拷貝到消息Hash字典中,消費完畢後再刪除,對應SendMessage中的MessageACK.CopyMessageToACKList方法和FastDefaultMessageHandler中的MessageACK.ConfirmMessageFinish方法,本質就是對Hash字典的增加與刪除功能

問題:消息處理端掛了再次重啟消息會丟失

解決方案

確認機制已經保證了消息即使沒有被消費完但是處理端宕機消息也不會丟失,需要註意的是,消息沒有丟失僅僅是Hash字典中有存儲,但是消息通道中不存在了,所以我們在系統每次啟動時掃描這個Hash字典,重新發佈消息到Channel,這樣可能導致重覆消費,所以需要靠FastDefaultMessageHandler中的CheckMessageIsConsume方法判斷,同時消息處理者本身處理異常我們也需要記錄下來,比如發簡訊供應商介面有問題,消息處理異常會進入Redis的ChannelException通道,我們可以根據需求實現一個可視化界面決定是否通過手動恢復

最後

Message組件相關代碼地址:https://gitee.com/grassprogramming/FastExecutor/tree/master/code/FastExecutor/FastExecutor.Message

存在不足問題:如果消息是單純記錄日誌問題,沒辦法確認消息是否消費了

如果大家有什麼好的建議,可留言一起交流學習,共同進步


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一.思路分析 redis數據傳輸遵循resp協議,只需要按照resp協議並通過socket傳遞數據到redis伺服器即可 resp數據格式: 二.具體實現 三.運行結果 ...
  • [TOC] 概述 首先同步下項目概況: 上篇文章分享了,路由中間件 Jaeger 鏈路追蹤(理論篇),這篇文章咱們接著分享:路由中間件 Jaeger 鏈路追蹤(實戰篇)。 這篇文章,確實讓大家久等了,主要是裡面有一些技術點都是剛剛研究的,沒有存貨。 先看下咱們要實現的東西: API 調用了 5 個服 ...
  • [ TOC ] 0. 前言 自上一篇文章《用python怎麼telnet到網路設備》,簡單使用了telnetlib庫給大家演示了下,但是,現實環境中仍不建議去使用telnet。 SSH(Secure Shell)協議也是屬於TCP/IP協議族裡的一種,埠號22,可以代替telnet來遠程管理的一種 ...
  • 簡介 nginx是一款輕量級的web伺服器,它是由俄羅斯的程式設計師伊戈爾·西索夫所開發。 nginx相比於Tomcat性能十分優秀,能夠支撐5w的併發連接(而Tomcat只能支撐200-400),並且nginx對CPU和記憶體的消耗十分的低,運行十分穩定。 nginx的作用非常多,但我們通常把它作為 ...
  • 基本數據類型 java 是強類型語言,在 java 中存儲的數據都是有類型的,而且必須在編譯時就確定其類型。 基本數據類型變數存儲的是數據本身,而引用類型變數存的是數據的空間地址。 基本類型轉換 自動類型轉換 把一個表數範圍小的數值或變數直接賦給另一個表數範圍大的變數時,系統將會進行自動類型轉換,否 ...
  • 一,不使用藍圖,自己分文件 目錄結構 app.py init.py user.py order.py 註意點:只有是包的時候才能from.然後import 相對路徑進行導入 缺點 容易發生迴圈導入問題 二.使用藍圖之中小型系統 "詳見代碼點擊可以下載" 目錄結構: __init_.py manage ...
  • 知識點 1. 初始化 :每一個flask程式都必須創建一個程式實例,遵循WSGI(Web Server Gateway interface)協議,把請求 flask Obj; 創建實例: Flask 類的構造函數只有一個必須指定的參數,即程式主模塊或包的名字。在大多數程式中,Python 的 __n ...
  • Flask框架整個流程源碼解讀 一.總的流程 運行Flask其本質是運行Flask對象中的\_\_call\_\_,而 本質調用wsgi_app的方法 二.具體流程 1.ctx = self.request_context(environ) environ 請求相關的,ctx現在是包含request ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...