# 在.NET Core中使用RabbitMQ # 前言 逛園子的時候看到一篇.NET 學習RabbitMq的文章(視頻地址和文章地址放在文章底部了),寫的不錯,我也來實現一下。 我是把RabbitMQ放在伺服器的,然後先說一下如何部署它。 註意:在使用到RabbitMQ的項目中需要安裝Nuget包 ...
在.NET Core中使用RabbitMQ
前言
逛園子的時候看到一篇.NET 學習RabbitMq的文章(視頻地址和文章地址放在文章底部了),寫的不錯,我也來實現一下。
我是把RabbitMQ放在伺服器的,然後先說一下如何部署它。
註意:在使用到RabbitMQ的項目中需要安裝Nuget包
dotnet add package RabbitMQ.Client
伺服器部署
添加management才能有web控制台
ip地址加15672埠訪問
拉取鏡像:
docker pull rabbitmq:management
運行容器:
#方式一:預設guest 用戶,密碼也是 guest
docker run -d --hostname test-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式二:設置用戶名和密碼
docker run -d --hostname test-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
RabbitMq工作隊列模式
點對點模式
首先創建2個.NET Core控制台程式
- ZY.RabbitMq.Producer 生產者 用於發送消息
- ZY.RabbitMQ.Consumer01 消費者 用於接收消息
- 點對點模式只會有一個消費者進行消費
然後分別創建HelloProducer類和HelloConsumer類
HelloProduce類
public class HelloProducer
{
public static void HelloWorldShow()
{
var factory = new ConnectionFactory();
factory.HostName = "伺服器地址或本機地址";
factory.Port = 5672;//5672是RabbitMQ預設的埠號
factory.UserName = "";
factory.Password = "";
factory.VirtualHost = "/"; //虛擬主機
// 獲取TCP 長連接
using var connection = factory.CreateConnection();
// 創建通信“通道”,相當於TCP中的虛擬連接
using var channel = connection.CreateModel();
/*
* 創建隊列,聲明並創建一個隊列,如果隊列已存在,則使用這個隊列
* 第一個參數:隊列名稱ID
* 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失
* 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
* 第四個:是否自動刪除,false代表連接停掉後不自動刪除這個隊列
* 其他額外參數為null
*/
channel.QueueDeclare("Hello", true, false, false, null);
Console.ForegroundColor = ConsoleColor.Red;
string message = "hello CodeMan 666";
var body = Encoding.UTF8.GetBytes(message);
/*
* exchange:交換機,暫時用不到,在進行發佈訂閱時才會用到
* 路由key
* 額外的設置屬性
* 最後一個參數是要傳遞的消息位元組數組
*/
channel.BasicPublish("", "Hello", null, body);
Console.WriteLine($"producer消息:{message}已發送");
}
}
HelloConsumer類
public class HelloConsumer
{
public static void HelloWorldShow()
{
var factory = new ConnectionFactory();
factory.HostName = "地址同上";
factory.Port = 5672;//5672是RabbitMQ預設的埠號
factory.UserName = "guest";
factory.Password = "guest";
factory.VirtualHost = "/";
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
/*
* 創建隊列,聲明並創建一個隊列,如果隊列已存在,則使用這個隊列
* 第一個參數:隊列名稱ID
* 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失
* 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
* 第四個:是否自動刪除,false代表連接停掉後不自動刪除這個隊列
* 其他額外參數為null
*/
//RabbitConstant.QUEUE_HELLO_WORLD 對應的生產者一樣名稱 "helloworld.queue"
channel.QueueDeclare("Hello", true, false, false, null);
Console.ForegroundColor = ConsoleColor.Cyan;
EventingBasicConsumer consumers = new EventingBasicConsumer(channel);
// 觸發事件
consumers.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// false只是確認簽收當前的消息,設置為true的時候則代表簽收該消費者所有未簽收的消息
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"Consumer01接收消息:{message}");
};
/*
* 從MQ伺服器中獲取數據
* 創建一個消息消費者
* 第一個參數:隊列名
* 第二個參數:是否自動確認收到消息,false代表手動確認消息,這是MQ推薦的做法
* 第三個參數:要傳入的IBasicConsumer介面
*
*/
//RabbitConstant.QUEUE_HELLO_WORLD == helloworld.queue
channel.BasicConsume("Hello", false, consumers);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
主程式
然後分別在各自的Main方法中調用函數
HelloConsumer.HelloWorldShow();
HelloProducer.HelloWorldShow();
效果展示
先運行 消費者 然後 運行 生產者
WorkQueues工作隊列
簡述
在Work Queues模式中,消息被均勻地分發給多個消費者。每個消息只會被一個消費者處理,確保每個任務只被一個消費者執行。這種分發方式稱為輪詢(Round-robin)分發。
工作隊列模式的特點包括:
- 併發處理:多個消費者可以並行地處理消息,提高了系統的處理能力。
- 負載均衡:消息會被均勻地分發給多個消費者,避免某個消費者被過載。
- 消息持久化:消息可以持久化存儲在隊列中,確保即使在消費者宕機後,消息也不會丟失。
- 確認機制:消費者可以向消息隊列發送確認消息,告知消息已被處理完成,確保消息的可靠處理。
工作隊列模式適用於任務較重的場景,例如處理大量的計算任務、耗時的IO操作等。它可以提高系統的併發處理能力和可靠性。
總結:Work Queues模式是一種消息隊列的模式,它實現了任務的併發處理和負載均衡,適用於處理任務較重的場景。通過使用RabbitMQ的Work Queues模式,你可以提高系統的吞吐量和響應速度。
與點對點模式代碼幾乎一樣,首先,需要將連接RabbitMq的代碼封裝一下。ZY.RabbitMq.Common類庫,然後創建一個RabbitUtils工具類,代碼如下:
public class RabbitUtils
{
public static ConnectionFactory GetConnection()
{
var factory = new ConnectionFactory
{
HostName = "ip地址",
Port = 5672, //5672是RabbitMQ預設的埠號
UserName = "",
Password = "",
VirtualHost = "/" //虛擬主機,可以在管理端更改
};
return factory;
}
}
完成工具類後,讓Producer 和 Consumer項目引用該類庫。
然後創建一個Sms類,代碼如下
public class Sms
{
public Sms(string passenger, string phone, string msg)
{
Passenger = passenger;
Phone = phone;
Msg = msg;
}
public string Passenger { get; set; }
public string Phone { get; set; }
public string Msg { get; set; }
}
Producer
在之前的Producer項目中添加SmsSender類,並創建Sender靜態方法用於模擬發送消息,代碼如下:
public class SmsSender
{
public static void Sender()
{
using var connection = RabbitUtils.GetConnection().CreateConnection();
using var channel =connection.CreateModel();
channel.QueueDeclare("ticket", true, false, false, null);
for (int i = 0; i < 1000; i++)
{
Sms sms = new Sms("乘客" + i, "1390101" + i, "您的車票已預訂成功!");
string jsonSms = JsonConvert.SerializeObject(sms);
var body = Encoding.UTF8.GetBytes(jsonSms);
channel.BasicPublish("", "ticket", null, body);
Console.WriteLine($"正在發送內容{jsonSms}");
}
Console.WriteLine("發送數據成功");
}
}
Consumer
在消費者中創建SmsReceive類和Sender靜態方法用於接收消息,代碼如下:
public class SmsReceive
{
public static void Sender()
{
var connection = RabbitUtils.GetConnection().CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare("ticket", true, false, false, null);
// 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者
// basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息後(確認後),在從隊列中獲取一個新的
channel.BasicQos(0, 1, false);//處理完一個取一個
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//Thread.Sleep(30);
Console.WriteLine($"SmsSender-發送簡訊成功:{message}");
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume("ticket", false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
不用與之前的是現在我們需要新增一個消費者用於測試,創建ZY.RabbitMQ.Consumer02 控制台程式,並且和上面一樣創建類和方法。代碼和Consumer01是一樣的,但是延時Thread.Sleep(60)
主程式
依舊在各自的Main方法中調用
//消費者調用
SmsSender.Sender();
//生產者調用
SmsReceive.Sender();
沒有延時的效果展示
有延時的演示效果
可以明顯的看到因為消費者01的處理效率高,所以他處理的消息比消費者02更多。
讓消費能力強的消費更多
- 消費者1比消費者2的效率要快,一次任務的耗時較短,(延時30毫秒)
- 消費者2大量時間處於空閑狀態,消費者1一直忙碌,(延時60毫秒)
通過設置channel.BasicAck(ea.DeliveryTag, false)
,來讓處理能力強的去消費更多。MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息後(確認後),在從隊列中獲取一個新的。
發佈訂閱模式
簡述
Publish/Subscribe Pattern
在訂閱模式中,多了一個Exchange角色。
Exchange:交換機,一方面接收生產者發送的消息,另一方面,知道如何處理消息,例如遞交給某個特別的隊列、遞交給所以隊列、或是將消息丟棄。如何操作,取決於Exchang的類型。
Exchange常見類型:
- Fanout:廣播,將消息給所有綁定到交換機的隊列
- Direct:定向,把消息交給符合指定routing key的隊列
- Topic:通配符,把消息交給符合routing pattern(路由模式)的隊列
下麵是發佈訂閱模式的基本工作流程:
- 創建一個交換機(Exchange)。
- 啟動多個消費者,每個消費者都創建一個隊列(Queue)並將其綁定到交換機上。
- 啟動消息發佈者,將消息發送到交換機上。
- 每個消費者從自己的隊列中獲取消息,併進行處理。
Exchange只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那麼消息會丟失!
下方代碼為Fanout廣播類型的交換機
生產者
在Producer中創建WeatherFanout類和Weather靜態方法,發送簡單的天氣信息。
public class WeatherFanout
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
string message = "20度";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER,"",null,body);
Console.WriteLine("天氣信息發送成功!");
}
}
}
}
可以看到上述的代碼沒有聲明隊列,隊列交給消費者去實現,後續需要將隊列和交換機進行綁定
消費者01
創建WeatherFanout類和Weather靜態方法
public class WeatherFanout
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
// 聲明隊列信息
channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
/*
* queueBind 用於將隊列與交換機綁定
* 參數1:隊列名
* 參數2:交換機名
* 參數3:路由Key(暫時用不到)
*/
channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ((model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"百度收到的氣象信息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
});
channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
}
}
消費者02
public class RabbitConstant
{
public const string QUEUE_HELLO_WORLD = "helloworld.queue";
public const string QUEUE_SMS = "sms.queue";
public const string EXCHANGE_WEATHER = "weather.exchange";
public const string **QUEUE_BAIDU** = "baidu.queue";
public const string **QUEUE_SINA** = "sina.queue";
public const string EXCHANGE_WEATHER_ROUTING = "weather.routing,exchange";
public const string EXCHANGE_WEATHER_TOPIC = "weather.topic.exchange";
}
修改隊列名為新浪的即可
// 聲明隊列信息
channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
效果展示
可以看到百度和新浪都收到消息了,原因是因為他們綁定了同一個交換機RabbitConstant.EXCHANGE_WEATHER
Routing路由模式
簡述
RabbitMQ的路由模式(Routing)是一種消息傳遞模式,它允許發送者將消息發送到特定的隊列,而不是發送到所有隊列。這種模式使用了交換機(Exchange)來實現消息的路由和分發。
在路由模式中,有三個關鍵組件:
- 生產者(Producer):負責發送消息到交換機,並指定路由鍵。
- 交換機(Exchange):接收生產者發送的消息,並根據路由鍵將消息路由到一個或多個與之綁定的隊列。
- 隊列(Queue):接收交換機發送的消息,併進行消費。
在路由模式中,交換機的類型通常為
direct
,它根據路由鍵的完全匹配將消息路由到與之綁定的隊列。每個隊列可以綁定多個路由鍵。
- 隊列與交換機綁定,不能是任意綁定,而是要指定一個RoutingKey
- 消息的發送方向Exchange發送消息時,必須指定消息的RoutingKey
- Exchange不再把消息交給每一個綁定的隊列,而是根據消息的RoutingKey進行判斷,只有隊列的RoutingKey與消息的RoutingKey完全一致,才會接收消息。
生產者
在Producer項目中新增WeatherDirect類和Weather靜態方法,用於發送消息到交換機中
public class WeatherDirect
{
public static void Weather()
{
Dictionary<string, string> area = new Dictionary<string, string>();
area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據");
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
foreach (var item in area)
{
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,
null, Encoding.UTF8.GetBytes(item.Value));
}
Console.WriteLine("氣象信息發送成功!");
}
}
}
}
消費者01
消費者接收消息不同於之前的是除了指定隊列名和交換機名還需指定路由key,也就是RoutingKey
消費者02
代碼同上,修改綁定部分的代碼就行了,綁定為RabbitConstant.QUEUE_SINA隊列,交換機一樣,routingkey換成area中剩餘的key就行了
效果展示
可以看到百度和新浪收到的消息是根據routingkey路由去獲取的消息
Topics通配符模式
簡述
Topic類型與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符。
RoutingKey一般都是由一個或多個單片語成,多個單詞之間以“.”分隔,例如:item.insert
通配符規則:
*
:匹配一個單詞,可以是任意字元串。#
:匹配零個或多個單詞,可以是任意字元串。
生產者
public class WeatherTopic
{
public static void Weather()
{
Dictionary<string, string> area = new Dictionary<string, string>();
area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據");
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
foreach (var item in area)
{
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,
null, Encoding.UTF8.GetBytes(item.Value));
}
Console.WriteLine("氣象信息發送成功!");
}
}
}
}
消費者01
代碼與路由模式相差不大,只需修改路由key即可
channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");
匹配路由key為china.開頭的所有信息,匹配如下
area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
消費者02
channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");
匹配路由key為china.hubei.任意字元串.20210525的信息,匹配如下
area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");=
效果展示
簡單總結
可以看到上面所講的幾種工作模式,很多地方代碼重覆了,是因為上述代碼只是用於學習測試,實際開發中,我們需要將RabiitMq封裝起來使用,具體實現看下方 ↓
doNetCore中使用
這裡使用的是發佈訂閱模式,簡單的封裝了一下連接RabiitMq
為項目安裝Nuget包
dotnet add package RabbitMQ.Client
創建Asp.Net Core WebApi項目,在appsettings.json中配置連接信息
"RabbitMQ": {
"Hostname": "localhost",
"Port": "埠",
"Username": "用戶名",
"Password": "密碼"
}
創建一個連接類
public class RabbitMQSettings
{
public string Hostname { get; set; }
public string Port { get; set; }
public string Username { get; set; }
public string Password { get; set; }
}
然後在program.cs配置文件中配置如下代碼
builder.Services.Configure<RabbitMQSettings>(builder.Configuration.GetSection("RabbitMQ"));
然後創建RabbitMQ的封裝類,用於處理與RabbitMQ的連接、通道、隊列等操作
public class RabbitMQConnectionFactory :IDisposable
{
private readonly RabbitMQSettings _settings;
private IConnection _connection;
public RabbitMQConnectionFactory (IOptions<RabbitMQSettings> settings)
{
_settings = settings.Value;
}
public IModel CreateChannel()
{
if (_connection == null || _connection.IsOpen == false)
{
var factory = new ConnectionFactory()
{
HostName = _settings.Hostname,
UserName = _settings.Username,
Password = _settings.Password
};
_connection = factory.CreateConnection();
}
return _connection.CreateModel();
}
public void Dispose()
{
if (_connection != null)
{
if (_connection.IsOpen)
{
_connection.Close();
}
_connection.Dispose();
}
}
}
創建一個簡單的發送消息的服務
public class MessageService
{
private readonly RabbitMQConnectionFactory _connectionFactory;
public MessageService(RabbitMQConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public void SendMessage(string message)
{
using (var channel = _connectionFactory.CreateChannel())
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Test",
routingKey:"",
basicProperties: null,
body: body);
}
}
}
然後添加一個控制器用於測試發送消息
[Route("api/[controller]")]
[ApiController]
public class MessageController : ControllerBase
{
private readonly MessageService _messageService;
public MessageController(MessageService messageService)
{
_messageService = messageService;
}
[HttpPost]
public IActionResult Post([FromBody] string message)
{
_messageService.SendMessage(message);
return Ok();
}
}
在配置文件中註入服務
builder.Services.Configure<RabbitMQSettings>(builder.Configuration.GetSection("RabbitMQ"));
builder.Services.AddSingleton<RabbitMQConnectionFactory >();
builder.Services.AddTransient<MessageService>();
最後創建一個控制台程式用於測試消息接收
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 埠,
UserName = "用戶名",
Password = "密碼"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("Test", ExchangeType.Fanout);
channel.QueueDeclare(queue: "my_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind("my_queue", "Test", "");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("接收到消息 {0}", message);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: "my_queue",
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
效果展示
總結
- 通過設置
channel.BasicAck(ea.DeliveryTag, false)
,來讓處理能力強的去消費更多 - channel.BasicQos(0, 1, false); //處理完一個取一個
- channel.BasicAck(ea.DeliveryTag, false); 確認消息的處理結果,並告知RabbitMQ可以從隊列中刪除該消息
下一篇文章將更新RabbitMQ的延時隊列和死信隊列
- 延時隊列(Delay Queue): 延時隊列用於延遲消息的投遞,即消息在發送後會在隊列中等待一段時間,然後再被消費者接收和處理。延時隊列通常用於實現一些定時任務、延遲任務或者消息重試機制。
- 死信隊列(Dead Letter Queue): 死信隊列用於處理無法被正常消費的消息,即那些無法被消費者成功處理的消息。當消息滿足一定的條件時,例如消息過期、被拒絕、隊列長度超過限制等,這些消息會被投遞到死信隊列中,以便進一步處理或分析。
參考鏈接
- centos-docker安裝rabbitmq https://blog.csdn.net/qq_40408317/article/details/105638053
- .NET6使用RabbitMQ學習 https://www.cnblogs.com/fantasy-ke/p/17555153.html
- RabbitMQ從零到高可用集群 https://www.bilibili.com/video/BV1GU4y1w7Yq/?share_source=copy_web&vd_source=fce337a51d11a67781404c67ec0b5084