首先我們需要瞭解到分散式事件匯流排是什麼; 分散式事件匯流排是一種在分散式系統中提供事件通知、訂閱和發佈機制的技術。它允許多個組件或微服務之間的協作和通信,而無需直接耦合或瞭解彼此的實現細節。通過事件匯流排,組件或微服務可以通過發佈或訂閱事件來實現非同步通信。 例如,當一個組件完成了某項任務並生成了一個事件 ...
首先我們需要瞭解到分散式事件匯流排是什麼;
分散式事件匯流排是一種在分散式系統中提供事件通知、訂閱和發佈機制的技術。它允許多個組件或微服務之間的協作和通信,而無需直接耦合或瞭解彼此的實現細節。通過事件匯流排,組件或微服務可以通過發佈或訂閱事件來實現非同步通信。
例如,當一個組件完成了某項任務並生成了一個事件,它可以通過事件匯流排發佈該事件。其他相關組件可以通過訂閱該事件來接收通知,並做出相應的反應。這樣,組件之間的耦合就被減輕了,同時也提高了系統的可維護性和可擴展性。
然後瞭解一下RabbitMQ
RabbitMQ
是一種開源的消息代理和隊列管理系統,用於在分散式系統中進行非同步通信。它的主要功能是接收和分發消息,並且支持多種協議,包括AMQP,STOMP,MQTT等。RabbitMQ
通過一個中間層,可以把消息發送者與消息接收者隔離開來,因此消息發送者和消息接收者並不需要在同一時刻線上,並且也不需要互相知道對方的地址。
- RabbitMQ的主要功能包括:
- 消息存儲:RabbitMQ可以將消息存儲在記憶體或硬碟上,以保證消息的完整性。
- 消息路由:RabbitMQ支持消息的路由功能,可以將消息從生產者發送到消費者。
- 消息投遞:RabbitMQ提供了多種消息投遞策略,包括簡單模式、工作隊列、發佈/訂閱模式等。
- 可靠性:RabbitMQ保證消息的可靠性,即消息不會丟失、不重覆、按順序投遞。
- 可擴展性:RabbitMQ支持水平擴展,可以通過增加節點來擴展系統的處理能力。
本文將講解使用RabbitMQ實現分散式事件
實現我們創建一個EventsBus.Contract
的類庫項目,用於提供基本介面,以支持其他實現
在項目中添加以下依賴引用,並且記得添加EventsBus.Contract
項目引用
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
</ItemGroup>
創建項目完成以後分別創建EventsBusOptions.cs
,IEventsBusHandle.cs
,RabbitMQEventsManage.cs
,ILoadEventBus.cs
,提供我們的分散式事件基本介面定義
EventsBusOptions.cs
:
namespace EventsBus.Contract;
public class EventsBusOptions
{
/// <summary>
/// 接收時異常事件
/// </summary>
public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}
IEventsBusHandle.cs
:
namespace EventsBus.Contract;
public interface IEventsBusHandle<in TEto> where TEto : class
{
Task HandleAsync(TEto eventData);
}
ILoadEventBus.cs
:
namespace EventsBus.Contract;
public interface ILoadEventBus
{
/// <summary>
/// 發佈事件
/// </summary>
/// <param name="eto"></param>
/// <typeparam name="TEto"></typeparam>
/// <returns></returns>
Task PushAsync<TEto>(TEto eto) where TEto : class;
}
EventsBusAttribute.cs
:用於Eto(Eto 是我們按照約定使用的Event Transfer Objects(事件傳輸對象)的尾碼. s雖然這不是必需的,但我們發現識別這樣的事件類很有用(就像應用層上的DTO 一樣))的名稱,對應到RabbitMQ
的通道
namespace EventsBus.RabbitMQ;
[AttributeUsage(AttributeTargets.Class)]
public class EventsBusAttribute : Attribute
{
public readonly string Name;
public EventsBusAttribute(string name)
{
Name = name;
}
}
然後可以創建我們的RabbitMQ
實現了,創建EventsBus.RabbitMQ
類庫項目,用於編寫EventsBus.Contract
的RabbitMQ
實現
創建項目完成以後分別創建Extensions\EventsBusRabbitMQExtensions.cs
,Options\RabbitMQOptions.cs
,EventsBusAttribute.cs
,,RabbitMQFactory.cs
,RabbitMQLoadEventBus.cs
Extensions\EventsBusRabbitMQExtensions.cs
:提供我們RabbitMQ擴展方法讓使用者更輕鬆的註入,命名空間使用Microsoft.Extensions.DependencyInjection
,這樣就在註入的時候減少過度使用命名空間了
using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;
namespace Microsoft.Extensions.DependencyInjection;
public static class EventsBusRabbitMQExtensions
{
public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
IConfiguration configuration)
{
services.AddSingleton<RabbitMQFactory>();
services.AddSingleton(typeof(RabbitMQEventsManage<>));
services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
return services;
}
}
Options\RabbitMQOptions.cs
:提供基本的Options
讀取配置文件中並且註入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
的方法是讀取IConfiguration
的名稱為RabbitMQOptions
的配置東西,映射到Options中,具體使用往下看。
using RabbitMQ.Client;
namespace EventsBus.RabbitMQ.Options;
public class RabbitMQOptions
{
/// <summary>
/// 要連接的埠。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
/// 指示應使用的協議的預設值。
/// </summary>
public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;
/// <summary>
/// 地址
/// </summary>
public string HostName { get; set; }
/// <summary>
/// 賬號
/// </summary>
public string UserName { get; set; }
/// <summary>
/// 密碼
/// </summary>
public string Password { get; set; }
}
RabbitMQEventsManage.cs
:用於管理RabbitMQ的數據接收,並且將數據傳輸到指定的事件處理程式
using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace EventsBus.RabbitMQ;
public class RabbitMQEventsManage<TEto> where TEto : class
{
private readonly IServiceProvider _serviceProvider;
private readonly RabbitMQFactory _rabbitMqFactory;
public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
{
_serviceProvider = serviceProvider;
_rabbitMqFactory = rabbitMqFactory;
_ = Task.Run(Start);
}
private void Start()
{
var channel = _rabbitMqFactory.CreateRabbitMQ();
var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
var name = eventBus?.Name ?? typeof(TEto).Name;
channel.QueueDeclare(name, false, false, false, null);
var consumer = new EventingBasicConsumer(channel); //消費者
channel.BasicConsume(name, true, consumer); //消費消息
consumer.Received += async (model, ea) =>
{
var bytes = ea.Body.ToArray();
try
{
// 這樣就可以實現多個訂閱
var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
foreach (var handle in events)
{
await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
}
}
catch (Exception e)
{
EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
}
};
}
}
RabbitMQFactory.cs
:提供RabbitMQ
鏈接工廠,在這裡你可以自己去定義和管理RabbitMQ
工廠
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace EventsBus.RabbitMQ;
public class RabbitMQFactory : IDisposable
{
private readonly RabbitMQOptions _options;
private readonly ConnectionFactory _factory;
private IConnection? _connection;
public RabbitMQFactory(IOptions<RabbitMQOptions> options)
{
_options = options?.Value;
// 將Options中的參數添加到ConnectionFactory
_factory = new ConnectionFactory
{
HostName = _options.HostName,
UserName = _options.UserName,
Password = _options.Password,
Port = _options.Port
};
}
public IModel CreateRabbitMQ()
{
// 當第一次創建RabbitMQ的時候進行鏈接
_connection ??= _factory.CreateConnection();
return _connection.CreateModel();
}
public void Dispose()
{
_connection?.Dispose();
}
}
RabbitMQLoadEventBus.cs
:用於實現ILoadEventBus.cs
通過ILoadEventBus
發佈事件RabbitMQLoadEventBus.cs
是RabbitMQ的實現
using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
namespace EventsBus.RabbitMQ;
public class RabbitMQLoadEventBus : ILoadEventBus
{
private readonly IServiceProvider _serviceProvider;
private readonly RabbitMQFactory _rabbitMqFactory;
public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
{
_serviceProvider = serviceProvider;
_rabbitMqFactory = rabbitMqFactory;
}
public async Task PushAsync<TEto>(TEto eto) where TEto : class
{
//創建一個通道
//這裡Rabbit的玩法就是一個通道channel下包含多個隊列Queue
using var channel = _rabbitMqFactory.CreateRabbitMQ();
// 獲取Eto中的EventsBusAttribute特性,獲取名稱,如果沒有預設使用類名稱
var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
var name = eventBus?.Name ?? typeof(TEto).Name;
// 使用獲取的名稱創建一個通道
channel.QueueDeclare(name, false, false, false, null);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
// 將數據序列號,然後發佈
channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生產消息
// 讓其註入啟動管理服務,RabbitMQEventsManage需要手動激活,由於RabbitMQEventsManage是單例,只有第一次激活才有效,
var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
await Task.CompletedTask;
}
}
在這裡我們的RabbitMQ
分散式事件就設計完成了,註:這隻是簡單的一個示例,並未經過大量測試,請勿直接在生產使用;
然後我們需要使用RabbitMQ分散式事件匯流排工具包
使用RabbitMQ分散式事件匯流排的示例
首先我們需要準備一個RabbitMQ,可以在官網自行下載,我就先使用簡單的,通過docker compose
啟動一個RabbitMQ
,下麵提供一個compose文件
version: '3.1'
services:
rabbitmq:
restart: always # 開機自啟
image: rabbitmq:3.11-management # RabbitMQ使用的鏡像
container_name: rabbitmq # docker名稱
hostname: rabbit
ports:
- 5672:5672 # 只是RabbitMQ SDK使用的埠
- 15672:15672 # 這是RabbitMQ管理界面使用的埠
environment:
TZ: Asia/Shanghai # 設置RabbitMQ時區
RABBITMQ_DEFAULT_USER: token # rabbitMQ賬號
RABBITMQ_DEFAULT_PASS: dd666666 # rabbitMQ密碼
volumes:
- ./data:/var/lib/rabbitmq
啟動以後我們創建一個WebApi
項目,項目名稱Demo
,創建完成打開項目文件添加引用
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
</ItemGroup>
<ItemGroup>
<!-- 引用RabbitMQ事件匯流排項目-->
<ProjectReference Include="..\EventsBus.RabbitMQ\EventsBus.RabbitMQ.csproj" />
</ItemGroup>
</Project>
修改appsettings.json
配置文件:將RabbitMQ的配置寫上,RabbitMQOptions
名稱對應在EventsBus.RabbitMQ
中的RabbitMQOptions
文件![image-20230211022801105]
在這裡註入的時候將配置註入好了
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"RabbitMQOptions": {
"HostName": "127.0.0.1",
"UserName": "token",
"Password": "dd666666"
}
}
創建DemoEto.cs
文件:
using EventsBus.RabbitMQ;
namespace Demo;
[EventsBus("Demo")]
public class DemoEto
{
public int Size { get; set; }
public string Value { get; set; }
}
創建DemoEventsBusHandle.cs
文件:這裡是訂閱DemoEto
事件,相當於是DemoEto
的處理程式
using System.Text.Json;
using EventsBus.Contract;
namespace Demo;
/// <summary>
/// 事件處理服務,相當於訂閱事件
/// </summary>
public class DemoEventsBusHandle : IEventsBusHandle<DemoEto>
{
public async Task HandleAsync(DemoEto eventData)
{
Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");
await Task.CompletedTask;
}
}
打開Program.cs
修改代碼: 在這裡註入了事件匯流排服務,和我們的事件處理服務
using Demo;
using EventsBus.Contract;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// 註入事件處理服務
builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));
// 註入RabbitMQ服務
builder.Services.AddEventsBusRabbitMQ(builder.Configuration);
var app = builder.Build();
// 只有在Development顯示Swagger
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
// 強制Https
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
創建Controllers\EventBusController.cs
控制器:我們在控制器中註入了ILoadEventBus
,通過調用介面實現發佈事件;
using EventsBus.Contract;
using Microsoft.AspNetCore.Mvc;
namespace Demo.Controllers;
[ApiController]
[Route("[controller]")]
public class EventBusController : ControllerBase
{
private readonly ILoadEventBus _loadEventBus;
public EventBusController(ILoadEventBus loadEventBus)
{
_loadEventBus = loadEventBus;
}
/// <summary>
/// 發送信息
/// </summary>
/// <param name="eto"></param>
[HttpPost]
public async Task Send(DemoEto eto)
{
await _loadEventBus.PushAsync(eto);
}
}
然後我們啟動程式會打開Swagger
調試界面:
然後我們發送一下事件:
我們可以看到,在數據發送的時候也同時訂閱到了我們的信息,也可以通過分散式事件匯流排限流等實現,
來自Token的分享
技術交流群:737776595