.Net Core對於`RabbitMQ`封裝分散式事件匯流排

来源:https://www.cnblogs.com/hejiale010426/archive/2023/02/11/17110806.html
-Advertisement-
Play Games

首先我們需要瞭解到分散式事件匯流排是什麼; 分散式事件匯流排是一種在分散式系統中提供事件通知、訂閱和發佈機制的技術。它允許多個組件或微服務之間的協作和通信,而無需直接耦合或瞭解彼此的實現細節。通過事件匯流排,組件或微服務可以通過發佈或訂閱事件來實現非同步通信。 例如,當一個組件完成了某項任務並生成了一個事件 ...


首先我們需要瞭解到分散式事件匯流排是什麼;

分散式事件匯流排是一種在分散式系統中提供事件通知、訂閱和發佈機制的技術。它允許多個組件或微服務之間的協作和通信,而無需直接耦合或瞭解彼此的實現細節。通過事件匯流排,組件或微服務可以通過發佈或訂閱事件來實現非同步通信。

例如,當一個組件完成了某項任務並生成了一個事件,它可以通過事件匯流排發佈該事件。其他相關組件可以通過訂閱該事件來接收通知,並做出相應的反應。這樣,組件之間的耦合就被減輕了,同時也提高了系統的可維護性和可擴展性。

然後瞭解一下RabbitMQ

RabbitMQ是一種開源的消息代理和隊列管理系統,用於在分散式系統中進行非同步通信。它的主要功能是接收和分發消息,並且支持多種協議,包括AMQP,STOMP,MQTT等。RabbitMQ通過一個中間層,可以把消息發送者與消息接收者隔離開來,因此消息發送者和消息接收者並不需要在同一時刻線上,並且也不需要互相知道對方的地址。

  1. RabbitMQ的主要功能包括:
    1. 消息存儲:RabbitMQ可以將消息存儲在記憶體或硬碟上,以保證消息的完整性。
    2. 消息路由:RabbitMQ支持消息的路由功能,可以將消息從生產者發送到消費者。
    3. 消息投遞:RabbitMQ提供了多種消息投遞策略,包括簡單模式、工作隊列、發佈/訂閱模式等。
    4. 可靠性:RabbitMQ保證消息的可靠性,即消息不會丟失、不重覆、按順序投遞。
    5. 可擴展性:RabbitMQ支持水平擴展,可以通過增加節點來擴展系統的處理能力。

本文將講解使用RabbitMQ實現分散式事件

實現我們創建一個EventsBus.Contract的類庫項目,用於提供基本介面,以支持其他實現

在項目中添加以下依賴引用,並且記得添加EventsBus.Contract項目引用

<ItemGroup>
	<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
    <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
    <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
    <PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
</ItemGroup>

創建項目完成以後分別創建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我們的分散式事件基本介面定義

EventsBusOptions.cs

namespace EventsBus.Contract;

public class EventsBusOptions
{
    /// <summary>
    /// 接收時異常事件
    /// </summary>
    public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}

IEventsBusHandle.cs

namespace EventsBus.Contract;

public interface IEventsBusHandle<in TEto> where TEto : class
{
    Task HandleAsync(TEto eventData);
}

ILoadEventBus.cs

namespace EventsBus.Contract;

public interface ILoadEventBus
{
    /// <summary>
    /// 發佈事件
    /// </summary>
    /// <param name="eto"></param>
    /// <typeparam name="TEto"></typeparam>
    /// <returns></returns>
    Task PushAsync<TEto>(TEto eto) where TEto : class;
}

EventsBusAttribute.cs:用於Eto(Eto 是我們按照約定使用的Event Transfer Objects(事件傳輸對象)的尾碼. s雖然這不是必需的,但我們發現識別這樣的事件類很有用(就像應用層上的DTO 一樣))的名稱,對應到RabbitMQ的通道

namespace EventsBus.RabbitMQ;

[AttributeUsage(AttributeTargets.Class)]
public class EventsBusAttribute : Attribute
{
    public readonly string Name;

    public EventsBusAttribute(string name)
    {
        Name = name;
    }
}

然後可以創建我們的RabbitMQ實現了,創建EventsBus.RabbitMQ類庫項目,用於編寫EventsBus.ContractRabbitMQ實現

創建項目完成以後分別創建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

Extensions\EventsBusRabbitMQExtensions.cs:提供我們RabbitMQ擴展方法讓使用者更輕鬆的註入,命名空間使用Microsoft.Extensions.DependencyInjection,這樣就在註入的時候減少過度使用命名空間了

using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;

namespace Microsoft.Extensions.DependencyInjection;

public static class EventsBusRabbitMQExtensions
{
    public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
        IConfiguration configuration)
    {
        services.AddSingleton<RabbitMQFactory>();
        services.AddSingleton(typeof(RabbitMQEventsManage<>));
        services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
        services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
        
        return services;
    }
}

Options\RabbitMQOptions.cs:提供基本的Options 讀取配置文件中並且註入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是讀取IConfiguration的名稱為RabbitMQOptions的配置東西,映射到Options中,具體使用往下看。

using RabbitMQ.Client;

namespace EventsBus.RabbitMQ.Options;

public class RabbitMQOptions
{
    /// <summary>
    /// 要連接的埠。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
    /// 指示應使用的協議的預設值。
    /// </summary>
    public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;

    /// <summary>
    /// 地址
    /// </summary>
    public string HostName { get; set; }

    /// <summary>
    /// 賬號
    /// </summary>
    public string UserName { get; set; }

    /// <summary>
    /// 密碼
    /// </summary>
    public string Password { get; set; }
}

RabbitMQEventsManage.cs:用於管理RabbitMQ的數據接收,並且將數據傳輸到指定的事件處理程式

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace EventsBus.RabbitMQ;

public class RabbitMQEventsManage<TEto> where TEto : class
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQFactory _rabbitMqFactory;

    public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    {
        _serviceProvider = serviceProvider;
        _rabbitMqFactory = rabbitMqFactory;
        _ = Task.Run(Start);
    }

    private void Start()
    {
        var channel = _rabbitMqFactory.CreateRabbitMQ();
        var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
        var name = eventBus?.Name ?? typeof(TEto).Name;
        channel.QueueDeclare(name, false, false, false, null);
        var consumer = new EventingBasicConsumer(channel); //消費者
        channel.BasicConsume(name, true, consumer); //消費消息
        consumer.Received += async (model, ea) =>
        {
            var bytes = ea.Body.ToArray();
            try
            {
                // 這樣就可以實現多個訂閱
                var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
                foreach (var handle in events)
                {
                    await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
                }
            }
            catch (Exception e)
            {
                EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
            }
        };
    }
}

RabbitMQFactory.cs:提供RabbitMQ鏈接工廠,在這裡你可以自己去定義和管理RabbitMQ工廠

using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;

namespace EventsBus.RabbitMQ;

public class RabbitMQFactory : IDisposable
{
    private readonly RabbitMQOptions _options;
    private readonly ConnectionFactory _factory;
    private IConnection? _connection;

    public RabbitMQFactory(IOptions<RabbitMQOptions> options)
    {
        _options = options?.Value;
        // 將Options中的參數添加到ConnectionFactory
        _factory = new ConnectionFactory
        {
            HostName = _options.HostName,
            UserName = _options.UserName,
            Password = _options.Password,
            Port = _options.Port
        };
    }

    public IModel CreateRabbitMQ()
    {
        // 當第一次創建RabbitMQ的時候進行鏈接
        _connection ??= _factory.CreateConnection();

        return _connection.CreateModel();
    }

    public void Dispose()
    {
        _connection?.Dispose();
    }
}

RabbitMQLoadEventBus.cs:用於實現ILoadEventBus.cs通過ILoadEventBus發佈事件RabbitMQLoadEventBus.cs是RabbitMQ的實現

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;

namespace EventsBus.RabbitMQ;

public class RabbitMQLoadEventBus : ILoadEventBus
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQFactory _rabbitMqFactory;

    public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    {
        _serviceProvider = serviceProvider;
        _rabbitMqFactory = rabbitMqFactory;
    }

    public async Task PushAsync<TEto>(TEto eto) where TEto : class
    {

        //創建一個通道
        //這裡Rabbit的玩法就是一個通道channel下包含多個隊列Queue
        using var channel = _rabbitMqFactory.CreateRabbitMQ();
        
        // 獲取Eto中的EventsBusAttribute特性,獲取名稱,如果沒有預設使用類名稱
        var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
        var name = eventBus?.Name ?? typeof(TEto).Name;
        
        // 使用獲取的名稱創建一個通道
        channel.QueueDeclare(name, false, false, false, null);
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 1;
        // 將數據序列號,然後發佈
        channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生產消息
        // 讓其註入啟動管理服務,RabbitMQEventsManage需要手動激活,由於RabbitMQEventsManage是單例,只有第一次激活才有效,
        var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
        
        await Task.CompletedTask;
    }
}

在這裡我們的RabbitMQ分散式事件就設計完成了,註:這隻是簡單的一個示例,並未經過大量測試,請勿直接在生產使用;

然後我們需要使用RabbitMQ分散式事件匯流排工具包

使用RabbitMQ分散式事件匯流排的示例

首先我們需要準備一個RabbitMQ,可以在官網自行下載,我就先使用簡單的,通過docker compose啟動一個RabbitMQ,下麵提供一個compose文件

version: '3.1'
services:
  rabbitmq:
    restart: always # 開機自啟
    image: rabbitmq:3.11-management # RabbitMQ使用的鏡像
    container_name: rabbitmq # docker名稱
    hostname: rabbit
    ports:
      - 5672:5672 # 只是RabbitMQ SDK使用的埠
      - 15672:15672 # 這是RabbitMQ管理界面使用的埠
    environment:
      TZ: Asia/Shanghai # 設置RabbitMQ時區
      RABBITMQ_DEFAULT_USER: token # rabbitMQ賬號
      RABBITMQ_DEFAULT_PASS: dd666666 # rabbitMQ密碼
    volumes:
      - ./data:/var/lib/rabbitmq

啟動以後我們創建一個WebApi項目,項目名稱Demo,創建完成打開項目文件添加引用

<Project Sdk="Microsoft.NET.Sdk.Web">

    <PropertyGroup>
        <TargetFramework>net7.0</TargetFramework>
        <Nullable>enable</Nullable>
        <ImplicitUsings>enable</ImplicitUsings>
    </PropertyGroup>

    <ItemGroup>
        <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.0" />
        <PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
    </ItemGroup>

    <ItemGroup>
        <!-- 引用RabbitMQ事件匯流排項目-->
        <ProjectReference Include="..\EventsBus.RabbitMQ\EventsBus.RabbitMQ.csproj" />
    </ItemGroup>

</Project>

修改appsettings.json配置文件:將RabbitMQ的配置寫上,RabbitMQOptions名稱對應在EventsBus.RabbitMQ中的RabbitMQOptions文件![image-20230211022801105]

在這裡註入的時候將配置註入好了

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "RabbitMQOptions": {
    "HostName": "127.0.0.1",
    "UserName": "token",
    "Password": "dd666666"
  }
}

創建DemoEto.cs文件:

using EventsBus.RabbitMQ;

namespace Demo;

[EventsBus("Demo")]
public class DemoEto
{
    public int Size { get; set; }
    
    public string Value { get; set; }
}

創建DemoEventsBusHandle.cs文件:這裡是訂閱DemoEto事件,相當於是DemoEto的處理程式

using System.Text.Json;
using EventsBus.Contract;

namespace Demo;

/// <summary>
/// 事件處理服務,相當於訂閱事件
/// </summary>
public class DemoEventsBusHandle : IEventsBusHandle<DemoEto>
{
    public async Task HandleAsync(DemoEto eventData)
    {
        Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");
        await Task.CompletedTask;
    }
}

打開Program.cs 修改代碼: 在這裡註入了事件匯流排服務,和我們的事件處理服務

using Demo;
using EventsBus.Contract;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// 註入事件處理服務
builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));

// 註入RabbitMQ服務
builder.Services.AddEventsBusRabbitMQ(builder.Configuration);

var app = builder.Build();

// 只有在Development顯示Swagger
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

// 強制Https
app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

創建Controllers\EventBusController.cs控制器:我們在控制器中註入了ILoadEventBus ,通過調用介面實現發佈事件;

using EventsBus.Contract;
using Microsoft.AspNetCore.Mvc;

namespace Demo.Controllers;

[ApiController]
[Route("[controller]")]
public class EventBusController : ControllerBase
{
    private readonly ILoadEventBus _loadEventBus;

    public EventBusController(ILoadEventBus loadEventBus)
    {
        _loadEventBus = loadEventBus;
    }

    /// <summary>
    /// 發送信息
    /// </summary>
    /// <param name="eto"></param>
    [HttpPost]
    public async Task Send(DemoEto eto)
    {
        await _loadEventBus.PushAsync(eto);
    }
}

然後我們啟動程式會打開Swagger調試界面:

然後我們發送一下事件:

我們可以看到,在數據發送的時候也同時訂閱到了我們的信息,也可以通過分散式事件匯流排限流等實現,

來自Token的分享

技術交流群:737776595


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、Typora軟體的下載與使用 (1)、軟體下載 百度網盤windowsx64已破解:https://pan.baidu.com/s/1ksoh9TeprB5LkZ3tKB-AAA?pwd=p6w6 提取碼:p6w6 ios下載地址:https://mac.qdrayst.com/02/Typor ...
  • 1.什麼是函數模版 函數模板,實際上是建立一個通用函數,其函數類型和形參類型不具體制定,用一個虛擬的類型來代表。這個通用函數就成為函數模板 2.怎麼編寫函數模版 //T代表泛型的數據類型,不是只能寫T, template<class T>//讓編譯器看到這句話後面緊跟著的函數里有T不要報錯 void ...
  • 2023-02-10 一、配置SSM環境 1、添加日誌文件 在“shf-parent/web-admin/src/main/resources”下創建“logback.xml” <?xml version="1.0" encoding="UTF-8"?> <configuration debug=" ...
  • 教程簡介 批處理腳本語法 - 從簡單和簡單的步驟學習批處理腳本,從基本到高級概念,包括概述,環境,命令,文件,語法,變數,註釋,字元串,數組,決策,操作符,日期和時間,輸入/輸出,返回代碼,函數,進程,別名,設備,註冊表,網路,列印,調試,日誌記錄。 教程目錄 批處理腳本 - 語法 批處理腳本 - ...
  • 參考:https://blog.csdn.net/weixin_42401159/article/details/112187778 https://cloud.tencent.com/developer/article/1406445 在處理一些自然語言文字的過程中,會遇到一些錶面很奇怪的現象。 ...
  • 一、安裝 kafka是由scala語言寫成的,後面用Java重構了,但是不管怎樣,都要編譯到jvm虛擬機中執行。 centos:yum install java-11-openjdk ubuntu:apt install default-jdk java -version 下載kafka 下載 wg ...
  • 大數據時代,各行各業對數據採集的需求日益增多,網路爬蟲的運用也更為廣泛,越來越多的人開始學習網路爬蟲這項技術,K哥爬蟲此前已經推出不少爬蟲進階、逆向相關文章,為實現從易到難全方位覆蓋,特設【0基礎學爬蟲】專欄,幫助小白快速入門爬蟲,本期為爬蟲的基本介紹。 一、爬蟲概述 爬蟲又稱網路蜘蛛、網路機器人, ...
  • 教程簡介 C#概述 - 從簡單和簡單的步驟學習C#從基本到高級概念,包括概述,環境設置,程式結構,基本語法,數據類型,類型轉換,變數,常量,運算符,決策,迴圈,方法,Nullables ,數組,字元串,結構,枚舉,文件I / O,類,封裝,介面,繼承,命名空間,多態性,運算符重載,封裝,反射,屬性, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...