在ASP.NET Core上利用MassTransit來集成使用RabbitMQ真的很簡單,代碼也很簡潔。近期因為項目需要,我便在這基礎上再次進行了封裝,抽成了公共方法,使得使用RabbitMQ的調用變得更方便簡潔。那麼,就讓咱們來瞧瞧其魅力所在吧。 ...
在ASP.NET Core上利用MassTransit來集成使用RabbitMQ真的很簡單,代碼也很簡潔。近期因為項目需要,我便在這基礎上再次進行了封裝,抽成了公共方法,使得使用RabbitMQ的調用變得更方便簡潔。那麼,就讓咱們來瞧瞧其魅力所在吧。
MassTransit
先看看MassTransit是個什麼寶貝(MassTransit官網的簡介):
MassTransit是一個免費的開源輕量級消息匯流排,用於使用.NET框架創建分散式應用程式。MassTransit在現有的頂級消息傳輸上提供了一系列廣泛的功能,從而以開發人員友好的方式使用基於消息的會話模式非同步連接服務。基於消息的通信是實現面向服務的體繫結構的可靠且可擴展的方式。
通俗描述:
MassTransit就是一套基於消息服務的高級封裝類庫,下游可聯接RabbitMQ、Redis、MongoDb等服務。
github官網:https://github.com/MassTransit/MassTransit
RabbitMQ
RabbitMQ是成熟的MQ隊列服務,是由 Erlang 語言開發的 AMQP 的開源實現。關於介紹RabbitMQ的中文資料也很多,有需要可以自行查找。我這裡貼出其官網與下載安裝的鏈接,如下:
下載與安裝:http://www.rabbitmq.com/download.html
實現代碼
通過上面的介紹,咱們已對MassTransit與RabbitMQ有了初步瞭解,那麼現在來看看如何在ASP.NET Core上優雅的使用RabbitMQ吧。
1、創建一個名為“RabbitMQHelp.cs”公共類,用於封裝操作RabbitMQ的公共方法,並通過Nuget來管理並引用“MassTransit”與“MassTransit.RabbitMQ”類庫。
2、“RabbitMQHelp.cs”公共類主要對外封裝兩個靜態方法,其代碼如下:
1 using MassTransit; 2 using MassTransit.RabbitMqTransport; 3 using System; 4 using System.Collections.Generic; 5 using System.Text; 6 using System.Threading.Tasks; 7 8 namespace Lezhima.Comm 9 { 10 /// <summary> 11 /// RabbitMQ公共操作類,基於MassTransit庫 12 /// </summary> 13 public class RabbitMQHelp 14 { 15 #region 交換器 16 17 /// <summary> 18 /// 操作日誌交換器 19 /// 同時需在RabbitMQ的管理後臺創建同名交換器 20 /// </summary> 21 public static readonly string actionLogExchange = "Lezhima.ActionLogExchange"; 22 23 24 #endregion 25 26 27 #region 聲明變數 28 29 /// <summary> 30 /// MQ聯接地址,建議放到配置文件 31 /// </summary> 32 private static readonly string mqUrl = "rabbitmq://192.168.6.181/"; 33 34 /// <summary> 35 /// MQ聯接賬號,建議放到配置文件 36 /// </summary> 37 private static readonly string mqUser = "admin"; 38 39 /// <summary> 40 /// MQ聯接密碼,建議放到配置文件 41 /// </summary> 42 private static readonly string mqPwd = "admin"; 43 44 #endregion 45 46 /// <summary> 47 /// 創建連接對象 48 /// 不對外公開 49 /// </summary> 50 private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null) 51 { 52 //通過MassTransit創建MQ聯接工廠 53 return Bus.Factory.CreateUsingRabbitMq(cfg => 54 { 55 var host = cfg.Host(new Uri(mqUrl), hst => 56 { 57 hst.Username(mqUser); 58 hst.Password(mqPwd); 59 }); 60 registrationAction?.Invoke(cfg, host); 61 }); 62 } 63 64 65 /// <summary> 66 /// MQ生產者 67 /// 這裡使用fanout的交換類型 68 /// </summary> 69 /// <param name="obj"></param> 70 public async static Task PushMessage(string exchange, object obj) 71 { 72 var bus = CreateBus(); 73 var sendToUri = new Uri($"{mqUrl}{exchange}"); 74 var endPoint = await bus.GetSendEndpoint(sendToUri); 75 await endPoint.Send(obj); 76 } 77 78 /// <summary> 79 /// MQ消費者 80 /// 這裡使用fanout的交換類型 81 /// consumer必需是實現IConsumer介面的類實例 82 /// </summary> 83 /// <param name="obj"></param> 84 public static void ReceiveMessage(string exchange, object consumer) 85 { 86 var bus = CreateBus((cfg, host) => 87 { 88 //從指定的消息隊列獲取消息 通過consumer來實現消息接收 89 cfg.ReceiveEndpoint(host, exchange, e => 90 { 91 e.Instance(consumer); 92 }); 93 }); 94 bus.Start(); 95 } 96 } 97 } 98
3、“RabbitMQHelp.cs”公共類已經有了MQ“生產者”與“消費者”兩個對外的靜態公共方法,其中“生產者”方法可以在業務代碼中直接調用,可傳遞JSON、對象等類型的參數向指定的交換器發送數據。而“消費者”方法是從指定交換器中進行接收綁定,但接收到的數據處理功能則交給了“consumer”類(因為在實際項目中,不同的數據有不同的業務處理邏輯,所以這裡我們直接就通過IConsumer介面交給具體的實現類去做了)。那麼,下麵我們再來看看消費者里傳遞進來的“consumer”類的代碼吧:
1 using MassTransit; 2 using System; 3 using System.Collections.Generic; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace Lezhima.Storage.Consumer 8 { 9 /// <summary> 10 /// 從MQ接收並處理數據 11 /// 實現MassTransit的IConsumer介面 12 /// </summary> 13 public class LogConsumer : IConsumer<ActionLog> 14 { 15 /// <summary> 16 /// 實現Consume方法 17 /// 接收並處理數據 18 /// </summary> 19 /// <param name="context"></param> 20 /// <returns></returns> 21 public Task Consume(ConsumeContext<ActionLog> context) 22 { 23 return Task.Run(async () => 24 { 25 //獲取接收到的對象 26 var amsg = context.Message; 27 Console.WriteLine($"Recevied By Consumer:{amsg}"); 28 Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}"); 29 }); 30 } 31 } 32 } 33
調用代碼
1、生產者調用代碼如下:
1 /// <summary> 2 /// 測試MQ生產者 3 /// </summary> 4 /// <returns></returns> 5 [HttpGet] 6 public async Task<MobiResult> AddMessageTest() 7 { 8 //聲明一個實體對象 9 var model = new ActionLog(); 10 model.ActionLogId = Guid.NewGuid(); 11 model.CreateTime = DateTime.Now; 12 model.UpdateTime = DateTime.Now; 13 //調用MQ 14 await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model); 15 16 return new MobiResult(1000, "操作成功"); 17 }
2、消費者調用代碼如下:
1 using Lezhima.Storage.Consumer; 2 using Microsoft.Extensions.Configuration; 3 using System; 4 using System.IO; 5 6 namespace Lezhima.Storage 7 { 8 class Program 9 { 10 static void Main(string[] args) 11 { 12 var conf = new ConfigurationBuilder() 13 .SetBasePath(Directory.GetCurrentDirectory()) 14 .AddJsonFile("appsettings.json", true, true) 15 .Build(); 16 17 //調用接收者 18 RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange, 19 new LogConsumer() 20 ); 21 22 Console.ReadLine(); 23 } 24 } 25 } 26
總結
1、基於MassTransit庫使得我們使用RabbitMQ變得更簡潔、方便。而基於再次封裝後,生產者與消費者將不需要關註具體的業務,也跟業務代碼解耦了,更能適應項目的需要。
2、RabbitMQ的交換器需在其管理後臺自行創建,而這裡使用的fanout類型是因為其發送速度最快,且能滿足我的項目需要,各位可視自身情況選用不同的類型。fanout類型不會存儲消息,必需要消費者綁定交換器後才會發送給消費者。