.NET ActiveMQ類庫

来源:http://www.cnblogs.com/aning2015/archive/2017/10/13/7659957.html
-Advertisement-
Play Games

ActiveMQ .NET類庫 ActiveMQ是一種開源的,實現了JMS規範的,面向消息(MOM)的中間件,為應用程式提供高效的、可擴展的、穩定的和安全的企業級消息通信。 0. 準備 使用Nuget管理控制台安裝最新版 Apache.NMS.ActiveMQ PM Install Package ...


ActiveMQ .NET類庫

ActiveMQ是一種開源的,實現了JMS規範的,面向消息(MOM)的中間件,為應用程式提供高效的、可擴展的、穩定的和安全的企業級消息通信。

0. 準備

使用Nuget管理控制台安裝最新版Apache.NMS.ActiveMQ

    PM> Install-Package Apache.NMS.ActiveMQ

1. IMessageQueue 隊列介面

    /// <summary>
    /// 消息隊列介面
    /// </summary>
    public interface IMessageQueue
    {

        /// <summary>
        /// 打開連接
        /// </summary>
        void Open();

        /// <summary>
        /// 關閉連接
        /// </summary>
        void Close();
    }

2. ActiveMQ 抽象基類

/// <summary>
/// ActiveMQ
/// </summary>
public abstract class ActiveMQ
{
    #region 監聽連接對象
    protected IConnection _connection;
    protected ISession _session;
    protected IMessageConsumer _consumer;
    #endregion

    /// <summary>
    /// 連接地址
    /// </summary>
    public string BrokerUri { get; set; }

    /// <summary>
    /// 用於登錄的用戶名,必須和密碼同時指定
    /// </summary>
    public string UserName { get; set; }

    /// <summary>
    /// 用於登錄的密碼,必須和用戶名同時指定
    /// </summary>
    public string Password { get; set; }

    /// <summary>
    /// 隊列名稱
    /// </summary>
    public string QueueName { get; set; }

    /// <summary>
    /// 指定使用隊列的模式
    /// </summary>
    public MQMode MQMode { get; set; }
}

隊列模式:

/// <summary>
/// 隊列模式
/// </summary>
public enum MQMode
{
    /// <summary>
    /// 隊列,點對點模式。
    /// 使用此模式。一個生產者向隊列存入一條消息之後,只有一個消費者能觸發消息接收事件。
    /// </summary>
    Queue,

    /// <summary>
    /// 主題,發佈者/訂閱模式。
    /// 使用此模式,一個生產者向隊列存入一條消息之後,所有訂閱當前的主題的消費者都能觸發消息接收事件。
    /// 使用此模式,必須先創建消費者,再創建生產者。
    /// </summary>
    Topic
}

3. ActiveMQProducer 生產者

/// <summary>
/// ActiveMQ生產者,打開連接,向指定隊列中發送數據
/// </summary>
public class ActiveMQProducer : ActiveMQ, IMessageQueue, IDisposable
{
    /// <summary>
    /// 隊列緩存字典
    /// </summary>
    private ConcurrentDictionary<string, IMessageProducer> _concrtProcuder = new ConcurrentDictionary<string, IMessageProducer>();

    /// <summary>
    /// 打開連接
    /// </summary>
    public void Open()
    {
        if (string.IsNullOrWhiteSpace(this.BrokerUri))
            throw new MemberAccessException("未指定BrokerUri");
        if (string.IsNullOrWhiteSpace(this.QueueName))
            throw new MemberAccessException("未指定QueueName");

        var factory = new ConnectionFactory(this.BrokerUri);
        if (string.IsNullOrWhiteSpace(this.UserName) && string.IsNullOrWhiteSpace(this.Password))
            _connection = factory.CreateConnection();
        else
            _connection = factory.CreateConnection(this.UserName, this.Password);
        _connection.Start();
        _session = _connection.CreateSession();

        CreateProducer(this.QueueName);
    }


    /// <summary>
    /// 關閉連接
    /// </summary>
    public void Close()
    {
        IMessageProducer _p = null;
        foreach (var p in this._concrtProcuder)
        {
            if (this._concrtProcuder.TryGetValue(p.Key, out _p))
            {
                _p?.Close();
            }
        }
        this._concrtProcuder.Clear();

        _session?.Close();
        _connection?.Close();
    }

    /// <summary>
    /// 向隊列發送數據
    /// </summary>
    /// <typeparam name="T">數據類型</typeparam>
    /// <param name="body">數據</param>
    public void Put<T>(T body)
    {
        Send(this.QueueName, body);
    }

    /// <summary>
    /// 向指定隊列發送數據
    /// </summary>
    /// <typeparam name="T">數據類型</typeparam>
    /// <param name="body">數據</param>
    /// <param name="queueName">指定隊列名</param>
    public void Put<T>(T body, string queueName)
    {
        Send(queueName, body);
    }

    /// <summary>
    /// 創建隊列
    /// </summary>
    /// <param name="queueName"></param>
    private IMessageProducer CreateProducer(string queueName)
    {
        if (_session == null)
        {
            Open();
        }

        //創建新生產者
        Func<string, IMessageProducer> CreateNewProducter = (name) =>
        {
            IMessageProducer _newProducer = null;
            switch (MQMode)
            {
                case MQMode.Queue:
                    {
                        _newProducer = _session.CreateProducer(new ActiveMQQueue(name));
                        break;
                    }
                case MQMode.Topic:
                    {
                        _newProducer = _session.CreateProducer(new ActiveMQTopic(name));
                        break;
                    }
                default:
                    {
                        throw new Exception(string.Format("無法識別的MQMode類型:{0}", MQMode.ToString()));
                    }
            }
            return _newProducer;
        };

        return this._concrtProcuder.GetOrAdd(queueName, CreateNewProducter);
    }

    /// <summary>
    /// 發送數據
    /// </summary>
    /// <param name="queueName">隊列名稱</param>
    /// <typeparam name="T"></typeparam>
    /// <param name="body">數據</param>
    private void Send<T>(string queueName, T body)
    {
        var producer = CreateProducer(queueName);
        IMessage msg;
        if (body is byte[])
        {
            msg = producer.CreateBytesMessage(body as byte[]);
        }
        else if (body is string)
        {
            msg = producer.CreateTextMessage(body as string);
        }
        else
        {
            msg = producer.CreateObjectMessage(body);
        }
        if (msg != null)
        {
            producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
        }
    }

    /// <summary>
    /// 執行與釋放或重置非托管資源相關的應用程式定義的任務。
    /// </summary>
    public void Dispose()
    {
        this.Close();
    }
}

4. ActiveMQConsumer 消費者

/// <summary>
/// ActiveMQ消費者,打開連接,監聽隊列,接收到數據之後觸發回調
/// </summary>
public class ActiveMQConsumer : ActiveMQ, IMessageQueue, IDisposable
{
    /// <summary>
    /// 接收到數據回調,ActiveMQ原生IMessage類型
    /// </summary>
    public Action<IMessage> OnMessageReceived { get; set; }

    /// <summary>
    /// 接收到消息回調(業務數據對象, 根據自己的業務靈活替換)
    /// </summary>
    public Action<DataCenterMessage> OnDataCenterMessageReceived { get; set; }

    /// <summary>
    /// 打開連接
    /// </summary>
    public void Open()
    {
        if (string.IsNullOrWhiteSpace(this.BrokerUri))
            throw new MemberAccessException("未指定BrokerUri");
        if (string.IsNullOrWhiteSpace(this.QueueName))
            throw new MemberAccessException("未指定QueueName");

        var factory = new ConnectionFactory(this.BrokerUri);
        if (string.IsNullOrWhiteSpace(this.UserName) && string.IsNullOrWhiteSpace(this.Password))
            _connection = factory.CreateConnection();
        else
            _connection = factory.CreateConnection(this.UserName, this.Password);
        _connection.Start();
        _session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

        switch (MQMode)
        {
            case MQMode.Queue:
                {
                    _consumer = _session.CreateConsumer(new ActiveMQQueue(this.QueueName));
                    break;
                }
            case MQMode.Topic:
                {
                    _consumer = _session.CreateConsumer(new ActiveMQTopic(this.QueueName));
                    break;
                }
            default:
                {
                    throw new Exception(string.Format("無法識別的MQMode類型:{0}", MQMode.ToString()));
                }
        }
    }

    /// <summary>
    /// 關閉連接
    /// </summary>
    public void Close()
    {
        _consumer?.Close();
        _session?.Close();
        _connection?.Close();
    }

    /// <summary>
    /// 開始監聽
    /// </summary>
    public void StartListen()
    {
        if (_consumer == null)
        {
            Open();
        }

        _consumer.Listener += new MessageListener(msg =>
        {
            if (OnMessageReceived != null)
                OnMessageReceived(msg);

            //轉換為業務需要的數據對象
            if (OnDataCenterMessageReceived != null)
            {
                var objectMessage = msg as ActiveMQObjectMessage;
                if (objectMessage != null)
                {
                    var dataCenterMsg = objectMessage.Body as DataCenterMessage;
                    if (dataCenterMsg != null)
                    {
                        OnDataCenterMessageReceived(dataCenterMsg);
                    }
                }
            }
        });
    }

    /// <summary>
    /// 執行與釋放或重置非托管資源相關的應用程式定義的任務。
    /// </summary>
    public void Dispose()
    {
        this.Close();
    }
}

5. 擴展方法

/// <summary>
/// 擴展方法類
/// </summary>
public static class ExtendMethods
{
    /// <summary>
    /// 將對象轉換為bytes
    /// </summary>
    /// <param name="obj"></param>
    /// <returns>bytes</returns>
    public static byte[] ToBytes<T>(this T obj) where T : class
    {
        if (obj == null)
            return null;
        using (var ms = new MemoryStream())
        {
            var formatter = new BinaryFormatter();
            formatter.Serialize(ms, obj);
            return ms.GetBuffer();
        }
    }

    /// <summary>
    /// 將bytes轉換為對象
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="bytes"></param>
    /// <returns></returns>
    public static T ToObject<T>(this byte[] bytes) where T : class
    {
        if (bytes == null)
            return default(T);
        using (var ms = new MemoryStream(bytes))
        {
            var formatter = new BinaryFormatter();
            return formatter.Deserialize(ms) as T;
        }
    }
}

6. 使用示例:

        #region 生產者
        var producer = new ActiveMQProducer();
        producer.BrokerUri = @"tcp://127.0.0.1:61616/";
        producer.UserName = "admin";
        producer.Password = "admin";
        producer.QueueName = "TestQueueName";
        producer.MQMode = MQMode.Queue;

        producer.Open();
        var message = new DataCenterMessage()
        {
            //初始化業務數據對象...
        };

        //發送到隊列, Put對象類必須使用[Serializable]註解屬性
        producer.Put(message);
        #endregion

        #region 消費者
        var consumer = new ActiveMQConsumer();
        consumer.BrokerUri = @"tcp://127.0.0.1:61616/";
        consumer.UserName = "admin";
        consumer.Password = "admin";
        consumer.QueueName = "TestQueueName";
        consumer.MQMode = MQMode.Queue;

        consumer.OnMessageReceived = (msg) =>
        {
            var bytesMessage = msg as ActiveMQBytesMessage;
            if (bytesMessage != null)
            {
                var buffer = new byte[bytesMessage.BodyLength];
                bytesMessage.WriteBytes(buffer);
                var result = buffer.ToObject<DataCenterMessage>();
                Debug.WriteLine(result);
            }
        };

        consumer.OnDataCenterMessageReceived = (msg) =>
        {
            Debug.Write(msg);
        };

        consumer.Open();
        consumer.StartListen();

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 假設有段代碼是這樣的,需要判斷角色裝備哪些武器,然後根據角色的這個屬性來進行其它的一些操作。這時候我們可以用枚舉來標記角色可以裝備的武器。 這就有個問題了,當角色可以裝備多種武器的時候,難道要用多個變數去表示他可以使用的武器嗎?又或者是在Equipment類型中加入新的成員,刀和箭,刀和箭和弓... ...
  • 前言:審批流程中常見的都是人工類型任務,但是也會有一些自動化的任務需要定時觸發。因此,引擎框架中需要解決掉兩個問題:選擇合適的任務調度框架,集成新的任務調度模塊。 1. 任務調度框架選擇 Hangfire 是一個開源的.NET任務調度框架,目前1.6+版本已支持.NET Core。 基於隊列的任務處 ...
  • is和as is關鍵字可以確定對象實例或表達式結果是否可轉換為指定類型。基本語法: 如果滿足以下條件,則 is 語句為 true: expr 是與 type 具有相同類型的一個實例。 expr 是派生自 type 的類型的一個實例。 換言之,expr 結果可以向上轉換為 type 的一個實例。 ex ...
  • 許久沒用C#寫程式。聽說進來發生大事,.NetCore2.0發佈了,於是便學習了下,本站也應運而生。 大多數的地方按照官方的文檔起步走就可以了,這裡談談遇到的幾個坑。 首先,本站是基於ASP.NetCore2.0和EntityFrameWorkCore.Sqlite的,前端使用了layui,搭建於C ...
  • WinForm預覽Office文檔 使用WinForm, WPF, Office組件 原理:使用Office COM組件將Word,Excel轉換為XPS文檔, 將WPF的 控制項寄宿到WinForm中, 實現預覽. 1. 新建WinForm項目 2. 新建WPF用戶控制項, 註意是WPF控制項 3. 編 ...
  • 根據可變性的規則,只有介面和委托可以標記可變性。且只有類型參數為引用類型時才可以利用可變性。 不變性:泛型類型的參數不能改變,這表示一個創建一個MyInterface<String>類型的對象時,賦值給它的只能是MyInterface<String>類型 逆變性:泛型的類型參數可以從一個類變成它的派 ...
  • .NET 實用擴展方法(持續更新...) 1. 字元串轉換為可空數值類型(int, long, float...類似) /// /// 將字元串轉換成32位整數,轉換失敗返回null /// /// 轉換的字元串 /// 轉換之後的整數,或null public static int? TryPar ...
  • foreach: continue;:退出本次迴圈 break;:退出迴圈 return;:退出迴圈 List.Foreach: return;:退出本次迴圈 小結:list.Foreach中不能退出迴圈,foreach中的return;和List.Foreach中的return;用法不同 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...