在開發某一個需求的時候,領導要求使用RocketMQ(阿裡雲版) 作為消息隊列。生產者主要有WebAPI/MVC/JOB(控制台應用程式),然後消費者採用的是Windows服務。那[西瓜程式猿]來記錄一下如何使用RocketMQ(阿裡雲版),給各位小伙伴作為參考防止踩坑。 ...
章節
第一章:https://www.cnblogs.com/kimiliucn/p/17662052.html
第二章:
作者:西瓜程式猿
主頁傳送門:https://www.cnblogs.com/kimiliucn/
開發背景
在開發某一個需求的時候,領導要求使用RocketMQ(阿裡雲版) 作為消息隊列。使用的版本是5.x,目前也已經沒有4.x購買的入口了,所以只能買5.x系列。公司項目還是用的比較老的技術.NET Framework 4.8,生產者主要有WebAPI/MVC/JOB(控制台應用程式),然後消費者採用的是Windows服務進行長鏈接消費信息。這期間因為各種原因踩過很多坑,然後咨詢了客服說RocketMQ(阿裡雲版)5.0不支持.NET Framework,但最終操作下來竟然能使用(只支持集群模式,不支持訂閱模式),那今天[西瓜程式猿]來記錄一下如何使用RocketMQ(阿裡雲版),給各位小伙伴作為參考防止踩坑。
環境版本
阿裡雲RocketMQ版本:5.0系列
.NET版本:.NET Framework 4.8
.NET版本:生產端(WebAPI/MVC/JOB)、消費端(Windows服務)
如果不知道怎麼選,或者不知道怎麼買雲消息隊列RocketMQ(阿裡雲版)?可以聯繫我[西瓜程式猿],如果需要特價購買可以通過下麵地址訪問:
一、RocketMQ基本介紹
官網地址:http://rocketmq.apache.org
RocketMQ阿裡雲-官方文檔:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/product-overview/basic-concepts?spm=a2c4g.11186623.0.0.513c5b3aztI6tB
1.1-RocketMQ簡介
RocketMQ(Apache RocketMQ)是一個開源的分散式消息中間件系統,由阿裡巴巴集團旗下的阿裡雲計算平臺團隊開發和維護。它最初是為滿足阿裡巴巴內部大規模分散式消息傳遞需求而設計的,後來成為 Apache 基金會的頂級開源項目之一。
1.2-RocketMQ優勢
在眾多應用場景中廣泛應用,如電子商務、物流配送、金融支付、大數據處理等。它被許多企業用於構建高性能和可靠的消息隊列系統,實現非同步通信和解耦應用程式組件。RocketMQ 提供了可靠、可擴展和高性能的消息傳遞解決方案,具備以下特點:
- 非同步通信:RocketMQ 支持發佈-訂閱和點對點兩種消息通信模式,以滿足不同場景下的需求。
- 高可靠性:提供多種存儲選項,包括本地文件存儲和遠程共用存儲,以確保消息的可靠傳輸和持久化。
- 高吞吐量:支持水平擴展,可以輕鬆應對大規模消息傳遞和高併發的需求。
- 嚴格有序性:支持消息按照發送順序和消費順序進行有序處理,保證消息的順序性。
- 分散式架構:採用分散式架構,具備良好的橫向擴展能力和高可用性。
1.3-RocketMQ基本概念
主題(Topic):雲消息隊列 RocketMQ 版中消息傳輸和存儲的頂層容器,用於標識同一類業務邏輯的消息。主題通過TopicName來做唯一標識和區分。
消息類型(MessageType):雲消息隊列 RocketMQ 版中按照消息傳輸特性的不同而定義的分類,用於類型管理和安全校驗。雲消息隊列 RocketMQ 版支持的消息類型有普通消息、順序消息、事務消息和定時/延時消息。
消息隊列(MessageQueue):隊列是雲消息隊列 RocketMQ 版中消息存儲和傳輸的實際容器,也是消息的最小存儲單元。雲消息隊列 RocketMQ 版的所有主題都是由多個隊列組成,以此實現隊列數量的水平拆分和隊列內部的流式存儲。隊列通過QueueId來做唯一標識和區分。
消息(Message):消息是雲消息隊列 RocketMQ 版中的最小數據傳輸單元。生產者將業務數據的負載和拓展屬性包裝成消息發送到雲消息隊列 RocketMQ 版服務端,服務端按照相關語義將消息投遞到消費端進行消費。
消息視圖(MessageView):消息視圖是雲消息隊列 RocketMQ 版面向開發視角提供的一種消息只讀介面。通過消息視圖可以讀取消息內部的多個屬性和負載信息,但是不能對消息本身做任何修改。
消息標簽(MessageTag):消息標簽是雲消息隊列 RocketMQ 版提供的細粒度消息分類屬性,可以在主題層級之下做消息類型的細分。消費者通過訂閱特定的標簽來實現細粒度過濾。
消息位點(MessageQueueOffset):消息是按到達雲消息隊列 RocketMQ 版服務端的先後順序存儲在指定主題的多個隊列中,每條消息在隊列中都有一個唯一的Long類型坐標,這個坐標被定義為消息位點。
消費位點(ConsumerOffset):一條消息被某個消費者消費完成後不會立即從隊列中刪除,雲消息隊列 RocketMQ 版會基於每個消費者分組記錄消費過的最新一條消息的位點,即消費位點。
消息索引(MessageKey):消息索引是雲消息隊列 RocketMQ 版提供的面向消息的索引屬性。通過設置的消息索引可以快速查找到對應的消息內容。
生產者(Producer):生產者是雲消息隊列 RocketMQ 版系統中用來構建並傳輸消息到服務端的運行實體。生產者通常被集成在業務系統中,將業務消息按照要求封裝成雲消息隊列 RocketMQ 版的消息併發送至服務端。
事務檢查器(TransactionChecker):雲消息隊列 RocketMQ 版中生產者用來執行本地事務檢查和異常事務恢復的監聽器。事務檢查器應該通過業務側數據的狀態來檢查和判斷事務消息的狀態。
事務狀態(TransactionResolution):雲消息隊列 RocketMQ 版中事務消息發送過程中,事務提交的狀態標識,服務端通過事務狀態控制事務消息是否應該提交和投遞。事務狀態包括事務提交、事務回滾和事務未決。
消費者分組(ConsumerGroup):消費者分組是雲消息隊列 RocketMQ 版系統中承載多個消費行為一致的消費者的負載均衡分組。和消費者不同,消費者分組並不是運行實體,而是一個邏輯資源。在雲消息隊列 RocketMQ 版中,通過消費者分組內初始化多個消費者實現消費性能的水平擴展以及高可用容災。
消費者(Consumer):消費者是雲消息隊列 RocketMQ 版中用來接收並處理消息的運行實體。消費者通常被集成在業務系統中,從雲消息隊列 RocketMQ 版服務端獲取消息,並將消息轉化成業務可理解的信息,供業務邏輯處理。
消費結果(ConsumeResult):雲消息隊列 RocketMQ 版中PushConsumer消費監聽器處理消息完成後返回的處理結果,用來標識本次消息是否正確處理。消費結果包含消費成功和消費失敗。
訂閱關係(Subscription):訂閱關係是雲消息隊列 RocketMQ 版系統中消費者獲取消息、處理消息的規則和狀態配置。訂閱關係由消費者分組動態註冊到服務端系統,併在後續的消息傳輸中按照訂閱關係定義的過濾規則進行消息匹配和消費進度維護。
消息過濾:消費者可以通過訂閱指定消息標簽(Tag)對消息進行過濾,確保最終只接收被過濾後的消息合集。過濾規則的計算和匹配在雲消息隊列 RocketMQ 版的服務端完成。
重置消費位點:以時間軸為坐標,在消息持久化存儲的時間範圍內,重新設置消費者分組對已訂閱主題的消費進度,設置完成後消費者將接收設定時間點之後,由生產者發送到雲消息隊列 RocketMQ 版服務端的消息。
消息軌跡:在一條消息從生產者發出到消費者接收並處理過程中,由各個相關節點的時間、地點等數據匯聚而成的完整鏈路信息。通過消息軌跡,您能清晰定位消息從生產者發出,經由雲消息隊列 RocketMQ 版服務端,投遞給消費者的完整鏈路,方便定位排查問題。
消息堆積:生產者已經將消息發送到雲消息隊列 RocketMQ 版的服務端,但由於消費者的消費能力有限,未能在短時間內將所有消息正確消費掉,此時在雲消息隊列 RocketMQ 版的服務端保存著未被消費的消息,該狀態即消息堆積。
事務消息:事務消息是雲消息隊列 RocketMQ 版提供的一種高級消息類型,支持在分散式場景下保障消息生產和本地事務的最終一致性。
定時/延時消息:定時/延時消息是雲消息隊列 RocketMQ 版提供的一種高級消息類型,消息被髮送至服務端後,在指定時間後才能被消費者消費。通過設置一定的定時時間可以實現分散式場景的延時調度觸發效果。
順序消息:順序消息是雲消息隊列 RocketMQ 版提供的一種高級消息類型,支持消費者按照發送消息的先後順序獲取消息,從而實現業務場景中的順序處理。
二、RocketMQ前期準備
首先需要下載相關.NET相關的SDK,然後在阿裡雲後臺找到【實例用戶名】【實例密碼】【接入點鏈接信息】等信息,最後還需要創建【Group ID】和【Topic】用於給我們調用。
2.1-下載資源包及SDK
[西瓜程式猿]給正在看這篇文章的小伙伴提供了資源包,【ONSClient4CPP】文件夾裡面包含使用RocketMQ阿裡雲版本要依賴的DLL文件,【RocketMQ_SDK】文件夾包含了.NET Framework使用RocketMQ阿裡雲版本要用到的SDK文件,【vcredistx64】文件夾包含了Visual C++ 2015運行時環境安裝包,因為C++ DLL文件需要依賴這個,這個需要進行安裝。還包含其他輔助的工具及代碼。
可以訪問下載(如果失效了,請聯繫我)。
下載地址(編碼:stalua6n):https://yongteng.lanzoub.com/ice5a16p978h
密碼:1q81
文件截圖:
2.2-查詢基本配置信息
(1)首先點擊下麵鏈接進入消息隊列RocketMQ工作台,如果沒有登錄首先要進行登錄。然後在【資源分佈】裡面找到要操作的地域列表,點擊【地功能變數名稱稱】。
消息隊列RocketMQ(阿裡雲版):https://ons.console.aliyun.com/overview
(2)然後可以看到實例列表,找到要操作的實例,點擊【詳情】。
(3)然後在【運行信息】中找到【實例用戶名】和【實例密碼】,註意不是實例ID/實例名稱。
(4)然後還在當前頁面,往下翻到【TCP 協議接入點】中找到接入點和網路信息。如果大家需要在外網訪問自行開通公網訪問,好像需要另外付費。[西瓜程式猿]這邊只能通過【VPC專有網路】訪問,也就是只能在內網訪問。所以我以VPC專有網路來介紹。
那我們就把必要的信息都集齊全了,分別是【實例用戶名】【實例密碼】【TCP 協議接入點連接】。
2.3-配置Topic和Group
那什麼是Topic呢?雲消息隊列 RocketMQ 版中消息傳輸和存儲的頂層容器,用於標識同一類業務邏輯的消息。主題通過TopicName來做唯一標識和區分。可以理解為不同的系統、不同的發佈環境配置不同的Topic。然後來說一下如何配Topic和GroupID。
(1)在左側導航欄找到【Topic管理】,然後點擊【創建Topic】。名稱和描述都是必填的,消息類型根據自己業務場景選擇。[西瓜程式猿]這邊要求消息按照順序發送和消費,所以選擇【順序消息】。
(2)然後再來創建GroupID。一個 Group ID 代表一個 Consumer 實例群組。同一個消費者 Group ID 下所有的 Consumer 實例必須保證訂閱的 Topic 一致,並且也必須保證訂閱 Topic 時設置的過濾規則(Tag)一致。否則您的消息可能會丟失。
那我們就把必要的資源都創建好了,分別是【Topic名稱】【GroupID】。
Topic名稱:
GroupID:
二、RockeetMQ核心部分封裝
2.1-創建類庫項目
(1)點擊【創建新項目】,然後選擇【類庫(.NET Framework)】。
目錄:
(2)然後新建一個【SDK】文件夾,將下載好的資源包裡面文件夾【RocketMQ_SDK】的文件,複製到項目中【SDK】文件夾裡面。
資源包:
項目:
(3)然後就安裝相關的包,分別是【log4net】用來記錄日誌,【Newtonsoft.Json】用來做JSON序列化和反序列化。(如果自己項目中有日誌系統和反序列化工具,也可以不安裝,根據自己項目依賴公共輔助層去使用)
(4)創建了一個【Helper】文件夾寫一個JSON反序列化的幫助類(根據自己業務需要創建)。
目錄:
代碼;
public class JsonUtility
{
/// <summary>
/// 將實體類序列化為JSON
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="data"></param>
/// <returns></returns>
static public string SerializeJSON<T>(T data)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(data);
}
/// <summary>
/// 反序列化JSON
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="json"></param>
/// <returns></returns>
static public T DeserializeJSON<T>(string json)
{
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(json);
}
/// <summary>
/// 將IEnumerable<T,V>序列化為JSON
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
static public string SerializeDictionary(IEnumerable<KeyValuePair<string, string>> value)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(value.Select(I => new { label = I.Key, value = I.Value }));
}
}
(5)然後在創建一個【Attributes】文件夾。在裡面新建兩個Attribute特性,一個【ConsumerTagAttribute】用來區分Tag標簽,另一個【EventTypeAttribute】用來區分事件類型。
目錄:
代碼:
/// <summary>
/// Tag標簽
/// </summary>
public class ConsumerTagAttribute : Attribute
{
public string Tag { get; set; }
public ConsumerTagAttribute(string tag)
{
Tag = tag;
}
}
/// <summary>
/// 事件類型
/// </summary>
public class EventTypeAttribute : Attribute
{
public string EventType { get; set; }
public EventTypeAttribute(string eventType)
{
EventType = eventType;
}
}
2.2-封裝傳輸實體模型
然後我們需要設計生產者和消費者直接需要傳輸共同的消息時哪些。
目前想到的(如果有好的建議可以在評論區討論哈):
MessageId:消息id
Tag:對應RocketMQ中Tag
SendTime:發送時間
Source:消息來源
EventType:事件類型
Body:消息體
目錄:
(1)創建一個【Models】文件夾,用來存相關的實體。然後創建【IQueueOnsCommonModel】生產者/消費者公共模型介面,然後創建【QueueOnsCommonModel】文件實現IQueueOnsCommonModel介面。
IQueueOnsCommonModel:
/// <summary>
/// 生產者/消費者公共模型介面
/// </summary>
public interface IQueueOnsCommonModel
{
/// <summary>
/// 消息id
/// </summary>
string MessageId { get; set; }
/// <summary>
/// 對應RocketMQ中Tag
/// </summary>
string Tag { get; set; }
/// <summary>
/// 發送時間
/// </summary>
DateTime SendTime { get; set; }
/// <summary>
/// 消息來源
/// </summary>
string Source { get; set; }
/// <summary>
/// 事件類型
/// </summary>
string EventType { get; set; }
/// <summary>
/// 消息體
/// </summary>
string Body { get; set; }
}
QueueOnsCommonModel:
/// <summary>
/// 生產者/消費者公共模型實現
/// </summary>
public class QueueOnsCommonModel : IQueueOnsCommonModel
{
/// <summary>
/// 消息id
/// </summary>
public string MessageId { get; set; }
/// <summary>
/// 對應RocketMQ中Tag
/// </summary>
public string Tag { get; set; }
/// <summary>
/// 發送時間
/// </summary>
public DateTime SendTime { get; set; }
/// <summary>
/// 消息來源
/// </summary>
public string Source { get; set; }
/// <summary>
/// 事件類型
/// </summary>
public string EventType { get; set; }
/// <summary>
/// 消息體
/// </summary>
public string Body { get; set; }
}
(2)創建一個【ONSPropertyConfigModel】文件,用來做配置文件的實體。
/// <summary>
/// RocketMQ配置屬性
/// </summary>
public class ONSPropertyConfigModel
{
/// <summary>
/// 設置為雲消息隊列 RocketMQ 版控制台實例詳情頁的實例用戶名。
/// </summary>
public string AccessKey { get; set; }
/// <summary>
/// 設置為雲消息隊列 RocketMQ 版控制台實例詳情頁的實例密碼。
/// </summary>
public string SecretKey { get; set; }
/// <summary>
/// 設置為您在雲消息隊列 RocketMQ 版控制台創建的Group ID。
/// </summary>
public string GroupId { get; set; }
/// <summary>
/// 您在雲消息隊列 RocketMQ 版控制台創建的Topic。
/// </summary>
public string Topics { get; set; }
/// <summary>
/// 設置為您從雲消息隊列 RocketMQ 版控制台獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”
/// </summary>
public string NAMESRV_ADDR { get; set; }
/// <summary>
/// 消費者/生產者目標來源
/// </summary>
public string OnsClientCode { get; set; }
}
(3)然後創建一個【QueueTagConsts】文件,用來訂單消息隊列Tag常量,和一個【QueueOnsEventType】文件,用來定義事件類型。
目錄:
QueueTagConsts:
/// <summary>
/// 消息隊列Tag常量定義
/// 命名規範:項目名_自定義業務名_Tag
/// </summary>
public class QueueTagConsts
{
/// <summary>
/// 測試Sample
/// </summary>
public const string XG_Blog_Sample_Tag = "XG_Blog_Sample_Tag";
}
QueueOnsEventType:
/// <summary>
/// 消息隊列-事件類型
/// </summary>
public class QueueOnsEventType
{
/// <summary>
/// RocketMQ測試
/// </summary>
public const string RocketMQ_TEST = "RocketMQ_TEST";
}
2.3-封裝連接RocketMQ
創建一個【Core】文件夾,然後創建一個【IConsumerMsg】消費介面,和一個【QueueOnsProducer】文件用來封裝RocketMQ生產者連接。
目錄:
IConsumerMsg:
/// <summary>
/// 消費介面
/// </summary>
public interface IConsumerMsg
{
void Consume(QueueOnsCommonModel model);
}
QueueOnsProducer:
/// <summary>
/// 消息隊列-RocketMQ生產者
/// </summary>
public class QueueOnsProducer
{
private static Producer _producer;
private static PushConsumer _consumer;
private readonly static ILog logger = LogManager.GetLogger(typeof(QueueOnsProducer));
private static string Ons_Topic = "";
private static string Ons_AccessKey = "";
private static string Ons_SecretKey = "";
private static string Ons_GroupId = "";
private static string Ons_NameSrv = "";
private static int Ons_ConsumptionPattern = 1;
private static string Ons_Client_Code = "Test_RocketMQ_Producer";
private const string Ons_LogPath = "C://rocket_mq_logs";
public static string getOnsTopic
{
get
{
return Ons_Topic;
}
}
public static string getOnsClientCode
{
get
{
return Ons_Client_Code;
}
}
private static ONSFactoryProperty getFactoryPropertyProducer()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
return factoryInfo;
}
private static ONSFactoryProperty getFactoryPropertyConsumer()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
//消費模式(1:集群消費、2:廣播消費)
if (Ons_ConsumptionPattern == 1)
{
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
}
else if (Ons_ConsumptionPattern == 2)
{
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
}
return factoryInfo;
}
public static void CreateProducer(ONSPropertyConfigModel config)
{
if (config == null) { throw new ArgumentNullException("config is null"); }
if (string.IsNullOrEmpty(config.AccessKey)) { throw new ArgumentNullException("AccessKey is null"); }
if (string.IsNullOrEmpty(config.SecretKey)) { throw new ArgumentNullException("SecretKey is null"); }
if (string.IsNullOrEmpty(config.GroupId)) { throw new ArgumentNullException("GroupId is null"); }
if (string.IsNullOrEmpty(config.Topics)) { throw new ArgumentNullException("Topics is null"); }
if (string.IsNullOrEmpty(config.NAMESRV_ADDR)) { throw new ArgumentNullException("NAMESRV_ADDR is null"); }
if (string.IsNullOrEmpty(config.OnsClientCode)) { throw new ArgumentNullException("OnsClientCode is null"); }
Ons_AccessKey = config.AccessKey;
Ons_SecretKey = config.SecretKey;
Ons_GroupId = config.GroupId;
Ons_Topic = config.Topics;
Ons_NameSrv = config.NAMESRV_ADDR;
Ons_Client_Code = config.OnsClientCode;
_producer = ONSFactory.getInstance().createProducer(getFactoryPropertyProducer());
}
public static void StartProducer()
{
if (_producer != null)
{
_producer.start();
string msg = $"【{Ons_Topic}】-【{Ons_Client_Code}】:[{DateTime.Now}]生產者 啟動 成功!";
logger.Info(msg);
}
else
{
throw new ArgumentNullException("_producer is null,請先執行[CreateProducer]創建生產者後啟動");
}
}
public static void ShutdownProducer()
{
if (_producer != null)
{
_producer.shutdown();
string msg = $"【{Ons_Topic}】-【{Ons_Client_Code}】:[{DateTime.Now}]生產者 已關閉連接!";
logger.Info(msg);
}
}
public static string SendMessage(QueueOnsCommonModel model, string tag = "RegisterLog")
{
if (model == null) { throw new ArgumentNullException("model is null"); }
model.SendTime = DateTime.Now;
model.Source = Ons_Client_Code;
var send_str = JsonUtility.SerializeJSON(model);
byte[] bytes = Encoding.UTF8.GetBytes(send_str);
string str_new_msg = Encoding.Default.GetString(bytes);
logger.Info("【發送隊列消息】消息內容:" + str_new_msg);
string msg_key = model.MessageId;
string msg_id = string.Empty;
Message msg = new Message(Ons_Topic, tag, str_new_msg);
msg.setKey(msg_key);
try
{
SendResultONS sendResult = _producer.send(msg);
msg_id = sendResult.getMessageId();
logger.Info("【發送隊列消息】消息ID:" + msg_id);
}
catch (Exception ex)
{
logger.Error($"【發送隊列消息】發生異常了:{ex.Message}", ex);
throw ex;
}
return msg_id;
}
public static void CreatePushConsumer(ONSPropertyConfigModel config)
{
if (config == null) { throw new ArgumentNullException("config is null"); }
if (string.IsNullOrEmpty(config.AccessKey)) { throw new ArgumentNullException("AccessKey is null"); }
if (string.IsNullOrEmpty(config.SecretKey)) { throw new ArgumentNullException("SecretKey is null"); }
if (string.IsNullOrEmpty(config.GroupId)) { throw new ArgumentNullException("GroupId is null"); }
if (string.IsNullOrEmpty(config.Topics)) { throw new ArgumentNullException("Topics is null"); }
if (string.IsNullOrEmpty(config.NAMESRV_ADDR)) { throw new ArgumentNullException("NAMESRV_ADDR is null"); }
if (string.IsNullOrEmpty(config.OnsClientCode)) { throw new ArgumentNullException("OnsClientCode is null"); }
// 集群消費。
Ons_ConsumptionPattern = 1;
// 廣播消費。
//Ons_ConsumptionPattern = 2;
Ons_AccessKey = config.AccessKey;
Ons_SecretKey = config.SecretKey;
Ons_GroupId = config.GroupId;
Ons_Topic = config.Topics;
Ons_NameSrv = config.NAMESRV_ADDR;
Ons_Client_Code = config.OnsClientCode;
_consumer = ONSFactory.getInstance().createPushConsumer(getFactoryPropertyConsumer());
}
public static void SetPushConsumer(MessageListener listener, string subExpression = "*")
{
_consumer.subscribe(Ons_Topic, subExpression, listener);
}
public static void StartPushConsumer()
{
_consumer.start();
string msg = $"【{Ons_Topic}】-【{Ons_Client_Code}】:[{DateTime.Now}]消費者 啟動 成功!";
logger.Info(msg);
}
public static void ShutdownPushConsumer()
{
if (_consumer != null)
{
_consumer.shutdown();
string msg = $"【{Ons_Topic}】-【{Ons_Client_Code}】:[{DateTime.Now}]消費者 已關閉連接!";
logger.Info(msg);
}
}
}
三、生產端實現
3.1-創建生產者
3.1.1-創建MVC項目
(1)然後創建一個生產者,可以創建WebAPI/MVC/JOB(控制台應用程式)等等,那[西瓜程式猿]以MVC項目作為例子來介紹一下,創建一個名為【RocketMQ.Producer】項目。
運行測試一下:
3.1.2-項目依賴配置
阿裡雲提供的.NET版本是基於雲消息隊列 RocketMQ 版的CPP版本的托管封裝,這樣能保證.NET完全不依賴於Windows.NET公共庫。內部採用C++多線程併發處理,保證.NET版本的高效穩定。
(1)底層的C++ DLL相關文件,以及Visual C++ 2015運行時環境安裝包。如果沒有安裝Visual Studio 2015運行時環境,需要在資源包找到【vc_redist.x64.exe】文件進行安裝。
(2)在使用Visual Studio(VS)開發.NET的應用程式和類庫時,預設的目標平臺為“Any CPU”。但是.NET SDK僅支持Windows 64-bit操作系統,所以需要自行設置。先右擊【RocketMQ.Producer】項目,然後點擊【屬性】。
(3)點擊左側選項的【生成】,然後將目標平臺改為【x64】。
(4)將資源包【ONSClient4CPP】文件夾裡面所有的文件,複製到【bin】目錄下。
資源包:
項目:
3.1.3-使用log4net
(1)使用lo4net輸出日誌,大家也可以用別的日誌框架,記得在用到寫入日誌的地方自行進行修改。那[西瓜程式猿]使用log4net來介紹。我們在項目的根目錄下創建一個文件為【log4net.config】。
(2)【log4net.config】內容如下。
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
</configSections>
<system.web>
<compilation debug="true" targetFramework="4.5.2" />
<httpRuntime targetFramework="4.5.2" />
</system.web>
<log4net>
<!--錯誤日誌:::記錄錯誤日誌-->
<!--按日期分割日誌文件 一天一個-->
<!-- appender 定義日誌輸出方式 將日誌以回滾文件的形式寫到文件中。-->
<appender name="ErrorAppender" type="log4net.Appender.RollingFileAppender">
<!--保存路徑:下麵路徑項目啟動的時候自動在C盤中創建log、logError文件-->
<file value="log/error/error_" />
<!-- 如果想在本項目中添加路徑,那就直接去掉C:\\ 只設置log\\LogError 項目啟動中預設創建文件 -->
<appendToFile value="true"/>
<!--按照何種方式產生多個日誌文件(日期[Date],文件大小[Size],混合[Composite])-->
<rollingStyle value="Date"/>
<!--這是按日期產生文件夾-->
<datePattern value="yyyy-MM-dd'.log'"/>
<!--是否只寫到一個文件中-->
<staticLogFileName value="false"/>
<!--保留的log文件數量 超過此數量後 自動刪除之前的 好像只有在 按Size分割時有效 設定值value="-1"為不限文件數-->
<param name="MaxSizeRollBackups" value="100"/>
<!--每個文件的大小。只在混合方式與文件大小方式下使用。超出大小後在所有文件名後自動增加正整數重新命名,數字最大的最早寫入。可用的單位:KB|MB|GB。不要使用小數,否則會一直寫入當前日誌-->
<maximumFileSize value="50MB" />
<!-- layout 控制Appender的輸出格式,也可以是xml 一個Appender只能是一個layout-->
<layout type="log4net.Layout.PatternLayout">
<!--每條日誌末尾的文字說明-->
<!--輸出格式 模板-->
<!-- <param name="ConversionPattern" value="記錄時間:%date 線程ID:[%thread] 日誌級別:%-5level 記錄類:%logger
操作者ID:%property{Operator} 操作類型:%property{Action}%n 當前機器名:%property%n當前機器名及登錄用戶:%username %n
記錄位置:%location%n 消息描述:%property{Message}%n 異常:%exception%n 消息:%message%newline%n%n" />-->
<!--樣例:2008-03-26 13:42:32,111 [10] INFO Log4NetDemo.MainClass [(null)] - info-->
<!--<conversionPattern value="%newline %n記錄時間:%date %n線程ID:[%thread] %n日誌級別: %-5level %n錯誤描述:%message%newline %n"/>-->
<conversionPattern value="%n==========
%n【日誌級別】%-5level
%n【記錄時間】%date
%n【執行時間】[%r]毫秒
%n【出錯文件】%F
%n【出錯行號】%L
%n【出錯的類】%logger 屬性[%property{NDC}]
%n【錯誤描述】%message
%n【錯誤詳情】%newline"/>
</layout>
<filter type="log4net.Filter.LevelRangeFilter,log4net">
<levelMin value="ERROR" />
<levelMax value="FATAL" />
</filter>
</appender>
<!--DEBUG:::記錄DEBUG日誌-->
<!--按日期分割日誌文件 一天一個-->
<!-- appender 定義日誌輸出方式 將日誌以回滾文件的形式寫到文件中。-->
<appender name="DebugAppender" type="log4net.Appender.RollingFileAppender">
<!--保存路徑:下麵路徑項目啟動的時候自動在C盤中創建log、logError文件-->
<file value="log/debug/debug_" />
<!-- 如果想在本項目中添加路徑,那就直接去掉C:\\ 只設置log\\LogError 項目啟動中預設創建文件 -->
<appendToFile value="true"/>
<!--按照何種方式產生多個日誌文件(日期[Date],文件大小[Size],混合[Composite])-->
<rollingStyle value="Date"/>
<!--這是按日期產生文件夾-->
<datePattern value="yyyy-MM-dd'.log'"/>
<!--是否只寫到一個文件中-->
<staticLogFileName value="false"/>
<!--保留的log文件數量 超過此數量後 自動刪除之前的 好像只有在 按Size分割時有效 設定值value="-1"為不限文件數-->
<param name="MaxSizeRollBackups" value="100"/>
<!--每個文件的大小。只在混合方式與文件大小方式下使用。超出大小後在所有文件名後自動增加正整數重新命名,數字最大的最早寫入。可用的單位:KB|MB|GB。不要使用小數,否則會一直寫入當前日誌-->
<maximumFileSize value="50MB" />
<!-- layout 控制Appender的輸出格式,也可以是xml 一個Appender只能是一個layout-->
<layout type="log4net.Layout.PatternLayout">
<!--每條日誌末尾的文字說明-->
<!--輸出格式 模板-->
<!-- <param name="ConversionPattern" value="記錄時間:%date 線程ID:[%thread] 日誌級別:%-5level 記錄類:%logger
操作者ID:%property{Operator} 操作類型:%property{Action}%n 當前機器名:%property%n當前機器名及登錄用戶:%username %n
記錄位置:%location%n 消息描述:%property{Message}%n 異常:%exception%n 消息:%message%newline%n%n" />-->
<!--樣例:2008-03-26 13:42:32,111 [10] INFO Log4NetDemo.MainClass [(null)] - info-->
<!--<conversionPattern value="%newline %n記錄時間:%date %n線程ID:[%thread] %n日誌級別: %-5level %n錯誤描述:%message%newline %n"/>-->
<conversionPattern value="%n==========
%n【日誌級別】%-2level
%n【記錄時間】%date
%n【執行時間】[%r]毫秒
%n【debug文件】%F
%n【debug行號】%L
%n【debug類】%logger 屬性[%property{NDC}]
%n【debug描述】%message"/>
</layout>
<filter type="log4net.Filter.LevelRangeFilter,log4net">
<levelMin value="DEBUG" />
<levelMax value="WARN" />
</filter>
</appender>
<!--INFO:::記錄INFO日誌-->
<!--按日期分割日誌文件 一天一個-->
<!-- appender 定義日誌輸出方式 將日誌以回滾文件的形式寫到文件中。-->
<appender name="INFOAppender" type="log4net.Appender.RollingFileAppender">
<!--保存路徑:下麵路徑項目啟動的時候自動在C盤中創建log、logError文件-->
<file value="log/info/info_" />
<!-- 如果想在本項目中添加路徑,那就直接去掉C:\\ 只設置log\\LogError 項目啟動中預設創建文件 -->
<appendToFile value="true"/>
<!--按照何種方式產生多個日誌文件(日期[Date],文件大小[Size],混合[Composite])-->
<rollingStyle value="Date"/>
<!--這是按日期產生文件夾-->
<datePattern value="yyyy-MM-dd'.log'"/>
<!--是否只寫到一個文件中-->
<staticLogFileName value="false"/>
<!--保留的log文件數量 超過此數量後 自動刪除之前的 好像只有在 按Size分割時有效 設定值value="-1"為不限文件數-->
<param name="MaxSizeRollBackups" value="100"/>
<!--每個文件的大小。只在混合方式與文件大小方式下使用。超出大小後在所有文件名後自動增加正整數重新命名,數字最大的最早寫入。可用的單位:KB|MB|GB。不要使用小數,否則會一直寫入當前日誌-->
<maximumFileSize value="50MB" />
<!-- layout 控制Appender的輸出格式,也可以是xml 一個Appender只能是一個layout-->
<layout type="log4net.Layout.PatternLayout">
<!--每條日誌末尾的文字說明-->
<!--輸出格式 模板-->
<!-- <param name="ConversionPattern" value="記錄時間:%date 線程ID:[%thread] 日誌級別:%-5level 記錄類:%logger
操作者ID:%property{Operator} 操作類型:%property{Action}%n 當前機器名:%property%n當前機器名及登錄用戶:%username %n
記錄位置:%location%n 消息描述:%property{Message}%n 異常:%exception%n 消息:%message%newline%n%n" />-->
<!--樣例:2008-03-26 13:42:32,111 [10] INFO Log4NetDemo.MainClass [(null)] - info-->
<!--<conversionPattern value="%newline %n記錄時間:%date %n線程ID:[%thread] %n日誌級別: %-5level %n錯誤描述:%message%newline %n"/>-->
<conversionPattern value="%n==========
%n【日誌級別】%-2level
%n【記錄時間】%date
%n【執行時間】[%r]毫秒
%n【info文件】%F
%n【info行號】%L
%n【info類】%logger 屬性[%property{NDC}]
%n【info描述】%message"/>
</layout>
<filter type="log4net.Filter.LevelRangeFilter,log4net">
<levelMin value="INFO" />
<levelMax value="WARN" />
</filter>
</appender>
<!--Set root logger level to DEBUG and its only appender to A1-->
<root>
<!--控制級別,由低到高: ALL|DEBUG|INFO|WARN|ERROR|FATAL|OFF-->
<level value="ALL" />
<appender-ref ref="DebugAppender" />
<appender-ref ref="ErrorAppender" />
<appender-ref ref="INFOAppender" />
</root>
</log4net>
</configuration>
(3)並且右擊【log4net.config】文件,點擊【屬性】,然後將[複製到輸出目錄]設置為【始終複製】。
(4)然後安裝log4net。在項目目錄中右擊【引用】,然後點擊【管理NuGet程式包】
(5)然後點擊瀏覽,搜索【log4net】,右側點擊安裝即可。
(6)然後在【Global.asax】文件中註冊log4net。
protected void Application_Start()
{
XmlConfigurator.Configure(new System.IO.FileInfo(Server.MapPath("~/log4net.config")));
}
3.1.4-封裝發送消息
(1)在當前項目新建一個【Services】文件夾,作為服務層。大家也可以將Services創建為單獨的類庫,然後在這個項目上去引入【RocketMQ.Core】,在用【RocketMQ.Producer】項目區引入【Services】。那[西瓜程式猿]為了方便就直接在當前項目寫了。然後再【Services】文件夾裡面創建【BaseProducerService】文件,用於封裝生產者發送消息服務。
目錄:
代碼:
/// <summary>
/// 生產者服務
/// </summary>
public class BaseProducerService
{
private readonly ILog logger = log4net.LogManager.GetLogger(typeof(BaseProducerService));
public void SendQueueOnsProducer(string body, string msg_tag, string mgs_eventType)
{
if (string.IsNullOrEmpty(body)) { throw new ArgumentNullException("body is null"); }
if (string.IsNullOrEmpty(msg_tag)) { throw new ArgumentNullException("msg_tag is null"); }
if (string.IsNullOrEmpty(mgs_eventType)) { throw new ArgumentNullException("mgs_eventType is null"); }
string ons_topic = QueueOnsProducer.getOnsTopic;
string ons_client_code = QueueOnsProducer.getOnsClientCode;
//TODO:這裡需要生成唯一ID
string businessId = "MQ_1001";
logger.Info($"【發送RocketMQ消息隊列消息】準備開始執行了:(消息key:{businessId})(tag:{msg_tag})(event_type:{mgs_eventType})");
logger.Info($"【發送RocketMQ消息隊列消息】消息內容:{body}");
// TODO:在這裡可以持久化生產者消息
logger.Info($"【發送RocketMQ消息隊列消息】消息持久化成功!(消息主鍵id:{businessId})");
Task.Run(() =>
{
try
{
QueueOnsProducer.SendMessage(new QueueOnsCommonModel()
{
MessageId = businessId,
Tag = msg_tag,
EventType = mgs_eventType,
Body = body
}, msg_tag);
logger.Info($"【發送RocketMQ消息隊列消息】消息發送成功!");
}
catch (Exception ex)
{
logger.Error($"【發送RocketMQ消息隊列消息】發生異常:{ex.Message}", ex);
}
});
}
}
3.2-配置連接信息
(1)然後右擊【RocketMQ.Producer】項目下,點擊【引用】,然後將【RocketMQ.Core】項目勾選上確定。
(2)然後將前期準備的基本信息放在配置文件中。在【Web.config】文件進行配置。
代碼:
<!--消息隊列:RocketMQ-->
<!--設置為雲消息隊列 RocketMQ 版控制台實例詳情頁的實例用戶名。-->
<add key="ons_access_key" value="xxx" />
<!--設置為雲消息隊列 RocketMQ 版控制台實例詳情頁的實例密碼。-->
<add key="ons_secret_key" value="xxx" />
<!--您在雲消息隊列 RocketMQ 版控制台創建的Topic。-->
<add key="ons_topic" value="XG_CXY_Test" />
<!--設置為您在雲消息隊列 RocketMQ 版控制台創建的Group ID。-->
<add key="ons_groupId" value="XG_CXY_Group_Test" />
<!--設置為您從雲消息隊列 RocketMQ 版控制台獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”-->
<add key="ons_name_srv" value="xxx-xxx-xxx-xxx.rmq.aliyuncs.com:8080" />
<!--消費者/生產者目標來源-->
<add key="ons_client_code" value="XG_CXY_Producer_Develop" />
(3)然後創建一個【Config】文件夾,寫一個獲得【ConfigGeter】配置文件的幫助類。
代碼:
/// <summary>
/// 配置文件
/// </summary>
public class ConfigGeter
{
private static T TryGetValueFromConfig<T>(Func<string, T> parseFunc, Func<T> defaultTValueFunc, [CallerMemberName] string key = "", string supressKey = "")
{
try
{
if (!string.IsNullOrWhiteSpace(supressKey))
{
key = supressKey;
}
var node = ConfigurationManager.AppSettings[key];
return !string.IsNullOrEmpty(node) ? parseFunc(node) : defaultTValueFunc();
}
catch (Exception ex)
{
return default(T);
}
}
#region 消息隊列:RocketMQ
/// <summary>
/// 設置為雲消息隊列 RocketMQ 版控制台實例詳情頁的實例用戶名。
/// </summary>
public static string ons_access_key
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 設置為雲消息隊列 RocketMQ 版控制台實例詳情頁的實例密碼。
/// </summary>
public static string ons_secret_key
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 您在雲消息隊列 RocketMQ 版控制台創建的Topic。
/// </summary>
public static string ons_topic
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 設置為您在雲消息隊列 RocketMQ 版控制台創建的Group ID。
/// </summary>
public static string ons_groupId
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 設置為您從雲消息隊列 RocketMQ 版控制台獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
/// </summary>
public static string ons_name_srv
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 消息來源(生產者/消費端客戶端編碼)
/// </summary>
public static string ons_client_code
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
#endregion
}
3.3-啟動生產者
3.3.1-MVC/WebAPI項目
在【Global.asax】文件Application_Start方法中創建生產者,主要就是從配置文件中獲得配置信息,然後調用【QueueOnsProducer.CreateProducer】方法創建消息隊列生產者,通過調用【QueueOnsProducer.StartProducer】方法來啟動生產者。
代碼:
protected void Application_Start()
{
//創建生產者[西瓜程式猿]
string ons_access_key = ConfigGeter.ons_access_key;
string ons_secret_key = ConfigGeter.ons_secret_key;
string ons_topic = ConfigGeter.ons_topic;
string ons_groupId = ConfigGeter.ons_groupId;
string ons_name_srv = ConfigGeter.ons_name_srv;
string ons_client_code = ConfigGeter.ons_client_code;
QueueOnsProducer.CreateProducer(new ONSPropertyConfigModel()
{
AccessKey = ons_access_key,
SecretKey = ons_secret_key,
Topics = ons_topic,
GroupId = ons_groupId,
NAMESRV_ADDR = ons_name_srv,
OnsClientCode = ons_client_code,
});
//啟動生產者
QueueOnsProducer.StartProducer();
}
3.3.2-JOB(控制台應用程式)項目
在【Program.cs】項目啟動文件的Main方法中創建生產者,主要就是從配置文件中獲得配置信息,然後調用【QueueOnsProducer.CreateProducer】方法創建消息隊列生產者,通過調用【QueueOnsProducer.StartProducer】方法來啟動生產者。
3.4-發送消息
(1)先設計好消息傳輸內容(Body)實體,比如我這邊需要根據姓名/賬號做一些非同步業務處理,那我這筆就新建一個名為【RocketMQSampleModel】類。
目錄:
代碼:
/// <summary>
/// 發送RocketMQ測試消息實體
/// </summary>
public class RocketMQSampleModel
{
public string user_name { get; set; }
public string user_account { get; set; }
}
(2)然後就開始創建具體的發送RocketMQ消息的服務,可以根據自己的業務去創建,那[西瓜程式猿]這邊就創建一個名為【SampleProducerService】的發送RocketMQ消息服務,然後繼承【BaseProducerService】類。
目錄:
代碼:
public class SampleProducerService : BaseProducerService
{
/// <summary>
/// 發送RocketMQ測試消息
/// </summary>
/// <param name="model"></param>
public void SendTestMessageHandle(RocketMQSampleModel model)
{
if (model == null) return;
string msg_body = JsonUtility.SerializeJSON<RocketMQSampleModel>(model);
if (msg_body != null)
{
SendQueueOnsProducer(msg_body, QueueTagConsts.XG_Blog_Sample_Tag, QueueOnsEventType.RocketMQ_TEST);
}
}
}
(3)然後我們在Controller裡面去調用一下發送消息。[西瓜程式猿]這裡以【Home/Index】裡面進行使用。
截圖:
代碼:
//調用消息隊列
new SampleProducerService().SendTestMessageHandle(new RocketMQSampleModel()
{
user_name = "西瓜程式猿",
user_account = "admin"
});
(4)然後運行一下,看看能不能成功消息消息(預設就會執行到Home/Index)。[西瓜程式猿]這邊需要先發佈到伺服器上才能調用,因為只能在伺服器內網訪問,那我這邊發佈一下。
註意:發佈到伺服器上後,也需要將資源包中的【ONSClient4CPP】所有文件拷貝到伺服器站點的【bin】目錄下。
(5)發佈好了,然後運行一下,可以看到是成功了。
然後我們在來看看日誌,提示發送成功了。
最後在去阿裡雲後臺查詢一下是否有這條消息記錄。可以根據消息Key和消息ID兩種方式進行查詢。可以在後臺看到是真正發送成功了。
博客對於圖文有數量限制要求,那這一節先寫到這裡,持續更新中,下一章節有消費者的實現、防踩坑指南等等!
我是西瓜程式猿,感謝大家的閱讀。編寫不易,如果對大家有幫助,用您發財的小手點個贊和關註唄,非常感謝!有問題歡迎聯繫我一起學習與探討~
下一章節:
原文鏈接:https://www.cnblogs.com/kimiliucn/p/17662052.html
版權聲明:本文為原創文章,版權歸 [西瓜程式猿] 所有,轉載請註明出處,有任何疑問請私信咨詢。
原文鏈接:https://www.cnblogs.com/kimiliucn/p/17662052.html