ActiveMQ .NET類庫 ActiveMQ是一種開源的,實現了JMS規範的,面向消息(MOM)的中間件,為應用程式提供高效的、可擴展的、穩定的和安全的企業級消息通信。 0. 準備 使用Nuget管理控制台安裝最新版 Apache.NMS.ActiveMQ PM Install Package ...
ActiveMQ .NET類庫
ActiveMQ是一種開源的,實現了JMS規範的,面向消息(MOM)的中間件,為應用程式提供高效的、可擴展的、穩定的和安全的企業級消息通信。
0. 準備
使用Nuget管理控制台安裝最新版Apache.NMS.ActiveMQ
PM> Install-Package Apache.NMS.ActiveMQ
1. IMessageQueue 隊列介面
/// <summary>
/// 消息隊列介面
/// </summary>
public interface IMessageQueue
{
/// <summary>
/// 打開連接
/// </summary>
void Open();
/// <summary>
/// 關閉連接
/// </summary>
void Close();
}
2. ActiveMQ 抽象基類
/// <summary>
/// ActiveMQ
/// </summary>
public abstract class ActiveMQ
{
#region 監聽連接對象
protected IConnection _connection;
protected ISession _session;
protected IMessageConsumer _consumer;
#endregion
/// <summary>
/// 連接地址
/// </summary>
public string BrokerUri { get; set; }
/// <summary>
/// 用於登錄的用戶名,必須和密碼同時指定
/// </summary>
public string UserName { get; set; }
/// <summary>
/// 用於登錄的密碼,必須和用戶名同時指定
/// </summary>
public string Password { get; set; }
/// <summary>
/// 隊列名稱
/// </summary>
public string QueueName { get; set; }
/// <summary>
/// 指定使用隊列的模式
/// </summary>
public MQMode MQMode { get; set; }
}
隊列模式:
/// <summary>
/// 隊列模式
/// </summary>
public enum MQMode
{
/// <summary>
/// 隊列,點對點模式。
/// 使用此模式。一個生產者向隊列存入一條消息之後,只有一個消費者能觸發消息接收事件。
/// </summary>
Queue,
/// <summary>
/// 主題,發佈者/訂閱模式。
/// 使用此模式,一個生產者向隊列存入一條消息之後,所有訂閱當前的主題的消費者都能觸發消息接收事件。
/// 使用此模式,必須先創建消費者,再創建生產者。
/// </summary>
Topic
}
3. ActiveMQProducer 生產者
/// <summary>
/// ActiveMQ生產者,打開連接,向指定隊列中發送數據
/// </summary>
public class ActiveMQProducer : ActiveMQ, IMessageQueue, IDisposable
{
/// <summary>
/// 隊列緩存字典
/// </summary>
private ConcurrentDictionary<string, IMessageProducer> _concrtProcuder = new ConcurrentDictionary<string, IMessageProducer>();
/// <summary>
/// 打開連接
/// </summary>
public void Open()
{
if (string.IsNullOrWhiteSpace(this.BrokerUri))
throw new MemberAccessException("未指定BrokerUri");
if (string.IsNullOrWhiteSpace(this.QueueName))
throw new MemberAccessException("未指定QueueName");
var factory = new ConnectionFactory(this.BrokerUri);
if (string.IsNullOrWhiteSpace(this.UserName) && string.IsNullOrWhiteSpace(this.Password))
_connection = factory.CreateConnection();
else
_connection = factory.CreateConnection(this.UserName, this.Password);
_connection.Start();
_session = _connection.CreateSession();
CreateProducer(this.QueueName);
}
/// <summary>
/// 關閉連接
/// </summary>
public void Close()
{
IMessageProducer _p = null;
foreach (var p in this._concrtProcuder)
{
if (this._concrtProcuder.TryGetValue(p.Key, out _p))
{
_p?.Close();
}
}
this._concrtProcuder.Clear();
_session?.Close();
_connection?.Close();
}
/// <summary>
/// 向隊列發送數據
/// </summary>
/// <typeparam name="T">數據類型</typeparam>
/// <param name="body">數據</param>
public void Put<T>(T body)
{
Send(this.QueueName, body);
}
/// <summary>
/// 向指定隊列發送數據
/// </summary>
/// <typeparam name="T">數據類型</typeparam>
/// <param name="body">數據</param>
/// <param name="queueName">指定隊列名</param>
public void Put<T>(T body, string queueName)
{
Send(queueName, body);
}
/// <summary>
/// 創建隊列
/// </summary>
/// <param name="queueName"></param>
private IMessageProducer CreateProducer(string queueName)
{
if (_session == null)
{
Open();
}
//創建新生產者
Func<string, IMessageProducer> CreateNewProducter = (name) =>
{
IMessageProducer _newProducer = null;
switch (MQMode)
{
case MQMode.Queue:
{
_newProducer = _session.CreateProducer(new ActiveMQQueue(name));
break;
}
case MQMode.Topic:
{
_newProducer = _session.CreateProducer(new ActiveMQTopic(name));
break;
}
default:
{
throw new Exception(string.Format("無法識別的MQMode類型:{0}", MQMode.ToString()));
}
}
return _newProducer;
};
return this._concrtProcuder.GetOrAdd(queueName, CreateNewProducter);
}
/// <summary>
/// 發送數據
/// </summary>
/// <param name="queueName">隊列名稱</param>
/// <typeparam name="T"></typeparam>
/// <param name="body">數據</param>
private void Send<T>(string queueName, T body)
{
var producer = CreateProducer(queueName);
IMessage msg;
if (body is byte[])
{
msg = producer.CreateBytesMessage(body as byte[]);
}
else if (body is string)
{
msg = producer.CreateTextMessage(body as string);
}
else
{
msg = producer.CreateObjectMessage(body);
}
if (msg != null)
{
producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
}
}
/// <summary>
/// 執行與釋放或重置非托管資源相關的應用程式定義的任務。
/// </summary>
public void Dispose()
{
this.Close();
}
}
4. ActiveMQConsumer 消費者
/// <summary>
/// ActiveMQ消費者,打開連接,監聽隊列,接收到數據之後觸發回調
/// </summary>
public class ActiveMQConsumer : ActiveMQ, IMessageQueue, IDisposable
{
/// <summary>
/// 接收到數據回調,ActiveMQ原生IMessage類型
/// </summary>
public Action<IMessage> OnMessageReceived { get; set; }
/// <summary>
/// 接收到消息回調(業務數據對象, 根據自己的業務靈活替換)
/// </summary>
public Action<DataCenterMessage> OnDataCenterMessageReceived { get; set; }
/// <summary>
/// 打開連接
/// </summary>
public void Open()
{
if (string.IsNullOrWhiteSpace(this.BrokerUri))
throw new MemberAccessException("未指定BrokerUri");
if (string.IsNullOrWhiteSpace(this.QueueName))
throw new MemberAccessException("未指定QueueName");
var factory = new ConnectionFactory(this.BrokerUri);
if (string.IsNullOrWhiteSpace(this.UserName) && string.IsNullOrWhiteSpace(this.Password))
_connection = factory.CreateConnection();
else
_connection = factory.CreateConnection(this.UserName, this.Password);
_connection.Start();
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
switch (MQMode)
{
case MQMode.Queue:
{
_consumer = _session.CreateConsumer(new ActiveMQQueue(this.QueueName));
break;
}
case MQMode.Topic:
{
_consumer = _session.CreateConsumer(new ActiveMQTopic(this.QueueName));
break;
}
default:
{
throw new Exception(string.Format("無法識別的MQMode類型:{0}", MQMode.ToString()));
}
}
}
/// <summary>
/// 關閉連接
/// </summary>
public void Close()
{
_consumer?.Close();
_session?.Close();
_connection?.Close();
}
/// <summary>
/// 開始監聽
/// </summary>
public void StartListen()
{
if (_consumer == null)
{
Open();
}
_consumer.Listener += new MessageListener(msg =>
{
if (OnMessageReceived != null)
OnMessageReceived(msg);
//轉換為業務需要的數據對象
if (OnDataCenterMessageReceived != null)
{
var objectMessage = msg as ActiveMQObjectMessage;
if (objectMessage != null)
{
var dataCenterMsg = objectMessage.Body as DataCenterMessage;
if (dataCenterMsg != null)
{
OnDataCenterMessageReceived(dataCenterMsg);
}
}
}
});
}
/// <summary>
/// 執行與釋放或重置非托管資源相關的應用程式定義的任務。
/// </summary>
public void Dispose()
{
this.Close();
}
}
5. 擴展方法
/// <summary>
/// 擴展方法類
/// </summary>
public static class ExtendMethods
{
/// <summary>
/// 將對象轉換為bytes
/// </summary>
/// <param name="obj"></param>
/// <returns>bytes</returns>
public static byte[] ToBytes<T>(this T obj) where T : class
{
if (obj == null)
return null;
using (var ms = new MemoryStream())
{
var formatter = new BinaryFormatter();
formatter.Serialize(ms, obj);
return ms.GetBuffer();
}
}
/// <summary>
/// 將bytes轉換為對象
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="bytes"></param>
/// <returns></returns>
public static T ToObject<T>(this byte[] bytes) where T : class
{
if (bytes == null)
return default(T);
using (var ms = new MemoryStream(bytes))
{
var formatter = new BinaryFormatter();
return formatter.Deserialize(ms) as T;
}
}
}
6. 使用示例:
#region 生產者
var producer = new ActiveMQProducer();
producer.BrokerUri = @"tcp://127.0.0.1:61616/";
producer.UserName = "admin";
producer.Password = "admin";
producer.QueueName = "TestQueueName";
producer.MQMode = MQMode.Queue;
producer.Open();
var message = new DataCenterMessage()
{
//初始化業務數據對象...
};
//發送到隊列, Put對象類必須使用[Serializable]註解屬性
producer.Put(message);
#endregion
#region 消費者
var consumer = new ActiveMQConsumer();
consumer.BrokerUri = @"tcp://127.0.0.1:61616/";
consumer.UserName = "admin";
consumer.Password = "admin";
consumer.QueueName = "TestQueueName";
consumer.MQMode = MQMode.Queue;
consumer.OnMessageReceived = (msg) =>
{
var bytesMessage = msg as ActiveMQBytesMessage;
if (bytesMessage != null)
{
var buffer = new byte[bytesMessage.BodyLength];
bytesMessage.WriteBytes(buffer);
var result = buffer.ToObject<DataCenterMessage>();
Debug.WriteLine(result);
}
};
consumer.OnDataCenterMessageReceived = (msg) =>
{
Debug.Write(msg);
};
consumer.Open();
consumer.StartListen();