.Net Core微服務入門全紀錄(六)——EventBus-事件匯流排

来源:https://www.cnblogs.com/xhznl/archive/2020/06/22/13154851.html
-Advertisement-
Play Games

前言 上一篇【.Net Core微服務入門全紀錄(五)——Ocelot-API網關(下)】中已經完成了Ocelot + Consul的搭建,這一篇簡單說一下EventBus。 EventBus-事件匯流排 首先,什麼是事件匯流排呢? 貼一段引用: 事件匯流排是對觀察者(發佈-訂閱)模式的一種實現。它是一種 ...


前言

上一篇【.Net Core微服務入門全紀錄(五)——Ocelot-API網關(下)】中已經完成了Ocelot + Consul的搭建,這一篇簡單說一下EventBus。

EventBus-事件匯流排

  • 首先,什麼是事件匯流排呢?

貼一段引用:

事件匯流排是對觀察者(發佈-訂閱)模式的一種實現。它是一種集中式事件處理機制,允許不同的組件之間進行彼此通信而又不需要相互依賴,達到一種解耦的目的。

如果沒有接觸過EventBus,可能不太好理解。其實EventBus在客戶端開發中應用非常廣泛(android,ios,web前端等),用於多個組件(或者界面)之間的相互通信,懂的人都懂。。。

  • 那麼,我們為什麼要用EventBus呢?

就拿當前的項目舉例,我們有一個訂單服務,一個產品服務。客戶端有一個下單功能,當用戶下單時,調用訂單服務的下單介面,那麼下單介面需要調用產品服務的減庫存介面,這涉及到服務與服務之間的調用。那麼服務之間又怎麼調用呢?直接RESTAPI?或者效率更高的gRPC?可能這兩者各有各的使用場景,但是他們都存在一個服務之間的耦合問題,或者難以做到非同步調用。

試想一下:假設我們下單時調用訂單服務,訂單服務需要調用產品服務,產品服務又要調用物流服務,物流服務再去調用xx服務 等等。。。如果每個服務處理時間需要2s,不使用非同步的話,那這種體驗可想而知。

如果使用EventBus的話,那麼訂單服務只需要向EventBus發一個“下單事件”就可以了。產品服務會訂閱“下單事件”,當產品服務收到下單事件時,自己去減庫存就好了。這樣就避免了兩個服務之間直接調用的耦合性,並且真正做到了非同步調用。

既然涉及到多個服務之間的非同步調用,那麼就不得不提分散式事務。分散式事務並不是微服務獨有的問題,而是所有的分散式系統都會存在的問題。
關於分散式事務,可以查一下“CAP原則”和“BASE理論”瞭解更多。當今的分散式系統更多的會追求事務的最終一致性。

下麵使用國人開發的優秀項目“CAP”,來演示一下EventBus的基本使用。之所以使用“CAP”是因為它既能解決分散式系統的最終一致性,同時又是一個EventBus,它具備EventBus的所有功能!
作者介紹:https://www.cnblogs.com/savorboard/p/cap.html

CAP使用

  • 環境準備

在Docker中準備一下需要的環境,首先是資料庫,資料庫我使用PostgreSQL,用別的也行。CAP支持:SqlServer,MySql,PostgreSql,MongoDB。
關於在Docker中運行PostgreSQL可以看我的另一篇博客:https://www.cnblogs.com/xhznl/p/13155054.html

然後是MQ,這裡我使用RabbitMQ,Kafka也可以。
Docker運行RabbitMQ:

docker pull rabbitmq:management
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management

預設用戶:guest,密碼:guest

環境準備就完成了,Docker就是這麼方便。。。

  • 代碼修改:

為了模擬以上業務,需要修改大量代碼,下麵代碼如有遺漏的直接去github找。

NuGet安裝:

Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Npgsql.EntityFrameworkCore.PostgreSQL

CAP相關:

DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.PostgreSql

Order.API/Controllers/OrdersController.cs增加下單介面:

[Route("[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{
    private readonly ILogger<OrdersController> _logger;
    private readonly IConfiguration _configuration;
    private readonly ICapPublisher _capBus;
    private readonly OrderContext _context;

    public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context)
    {
        _logger = logger;
        _configuration = configuration;
        _capBus = capPublisher;
        _context = context;
    }

    [HttpGet]
    public IActionResult Get()
    {
        string result = $"【訂單服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +
            $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";
        return Ok(result);
    }

    /// <summary>
    /// 下單 發佈下單事件
    /// </summary>
    /// <param name="order"></param>
    /// <returns></returns>
    [Route("Create")]
    [HttpPost]
    public async Task<IActionResult> CreateOrder(Models.Order order)
    {
        using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true))
        {
            //業務代碼
            order.CreateTime = DateTime.Now;
            _context.Orders.Add(order);

            var r = await _context.SaveChangesAsync() > 0;

            if (r)
            {
                //發佈下單事件
                await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID });
                return Ok();
            }
            return BadRequest();
        }

    }

}

Order.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下單事件消息
/// </summary>
public class CreateOrderMessageDto
{
    /// <summary>
    /// 產品ID
    /// </summary>
    public int ProductID { get; set; }

    /// <summary>
    /// 購買數量
    /// </summary>
    public int Count { get; set; }
}

Order.API/Models/Order.cs訂單實體類:

public class Order
{
    [Key]
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public int ID { get; set; }

    /// <summary>
    /// 下單時間
    /// </summary>
    [Required]
    public DateTime CreateTime { get; set; }

    /// <summary>
    /// 產品ID
    /// </summary>
    [Required]
    public int ProductID { get; set; }

    /// <summary>
    /// 購買數量
    /// </summary>
    [Required]
    public int Count { get; set; }
}

Order.API/Models/OrderContext.cs資料庫Context:

public class OrderContext : DbContext
{
    public OrderContext(DbContextOptions<OrderContext> options)
       : base(options)
    {

    }

    public DbSet<Order> Orders { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {

    }
}

Order.API/appsettings.json增加資料庫連接字元串:

"ConnectionStrings": {
  "OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;"
}

Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext")));

    //CAP
    services.AddCap(x =>
    {
        x.UseEntityFramework<OrderContext>();

        x.UseRabbitMQ("host.docker.internal");
    });
}


以上是訂單服務的修改。

Product.API/Controllers/ProductsController.cs增加減庫存介面:

[Route("[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{
    private readonly ILogger<ProductsController> _logger;
    private readonly IConfiguration _configuration;
    private readonly ICapPublisher _capBus;
    private readonly ProductContext _context;

    public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context)
    {
        _logger = logger;
        _configuration = configuration;
        _capBus = capPublisher;
        _context = context;
    }

    [HttpGet]
    public IActionResult Get()
    {
        string result = $"【產品服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +
            $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";
        return Ok(result);
    }

    /// <summary>
    /// 減庫存 訂閱下單事件
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    [NonAction]
    [CapSubscribe("order.services.createorder")]
    public async Task ReduceStock(CreateOrderMessageDto message)
    {
        //業務代碼
        var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);
        product.Stock -= message.Count;

        await _context.SaveChangesAsync();
    }

}

Product.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下單事件消息
/// </summary>
public class CreateOrderMessageDto
{
    /// <summary>
    /// 產品ID
    /// </summary>
    public int ProductID { get; set; }

    /// <summary>
    /// 購買數量
    /// </summary>
    public int Count { get; set; }
}

Product.API/Models/Product.cs產品實體類:

public class Product
{
    [Key]
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public int ID { get; set; }

    /// <summary>
    /// 產品名稱
    /// </summary>
    [Required]
    [Column(TypeName = "VARCHAR(16)")]
    public string Name { get; set; }

    /// <summary>
    /// 庫存
    /// </summary>
    [Required]
    public int Stock { get; set; }
}

Product.API/Models/ProductContext.cs資料庫Context:

public class ProductContext : DbContext
{
    public ProductContext(DbContextOptions<ProductContext> options)
       : base(options)
    {

    }

    public DbSet<Product> Products { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
        
        //初始化種子數據
        modelBuilder.Entity<Product>().HasData(new Product
        {
            ID = 1,
            Name = "產品1",
            Stock = 100
        },
        new Product
        {
            ID = 2,
            Name = "產品2",
            Stock = 100
        });
    }
}

Product.API/appsettings.json增加資料庫連接字元串:

"ConnectionStrings": {
  "ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;"
}

Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext")));

    //CAP
    services.AddCap(x =>
    {
        x.UseEntityFramework<ProductContext>();

        x.UseRabbitMQ("host.docker.internal");
    });
}


以上是產品服務的修改。

訂單服務和產品服務的修改到此就完成了,看著修改很多,其實功能很簡單。就是各自增加了自己的資料庫表,然後訂單服務增加了下單介面,下單介面會發出“下單事件”。產品服務增加了減庫存介面,減庫存介面會訂閱“下單事件”。然後客戶端調用下單介面下單時,產品服務會減去相應的庫存,功能就這麼簡單。

關於EF資料庫遷移之類的基本使用就不介紹了。使用Docker重新構建鏡像,運行訂單服務,產品服務:

docker build -t orderapi:1.1 -f ./Order.API/Dockerfile .
docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060"
docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061"
docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062"

docker build -t productapi:1.1 -f ./Product.API/Dockerfile .
docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050"
docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051"
docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"

最後 Ocelot.APIGateway/ocelot.json 增加一條路由配置:

好了,進行到這裡,整個環境就有點複雜了。確保我們的PostgreSQL,RabbitMQ,Consul,Gateway,服務實例都正常運行。

服務實例運行成功後,資料庫應該是這樣的:




產品表種子數據:

cap.published表和cap.received表是由CAP自動生成的,它內部是使用本地消息表+MQ來實現非同步確保。

運行測試

這次使用Postman作為客戶端調用下單介面(9070是之前的Ocelot網關埠):

訂單庫published表:

訂單庫order表:

產品庫received表:

產品庫product表:

再試一下:

OK,完成。雖然功能很簡單,但是我們實現了服務的解耦,非同步調用,和最終一致性。

總結

註意,上面的例子純粹是為了說明EventBus的使用,實際中的下單流程絕對不會這麼做的!希望大家不要較真。。。

可能有人會說如果下單成功,但是庫存不足導致減庫存失敗了怎麼辦,是不是要回滾訂單表的數據?如果產生這種想法,說明還沒有真正理解最終一致性的思想。首先下單前肯定會檢查一下庫存數量,既然允許下單那麼必然是庫存充足的。這裡的事務是指:訂單保存到資料庫,和下單事件保存到cap.published表(保存到cap.published表理論上就能夠發送到MQ)這兩件事情,要麼一同成功,要麼一同失敗。如果這個事務成功,那麼就可以認為這個業務流程是成功的,至於產品服務的減庫存是否成功那就是產品服務的事情了(理論上也應該是成功的,因為消息已經確保發到了MQ,產品服務必然會收到消息),CAP也提供了失敗重試,和失敗回調機制。

如果非要數據回滾也是能實現的,CAP的ICapPublisher.Publish方法提供一個callbackName參數,當減庫存時,可以觸發這個回調。其本質也是通過發佈訂閱完成,這是不推薦的做法,就不詳細說了,有興趣自己研究一下。
另外,CAP無法保證消息不重覆,實際使用中需要自己考慮一下消息的重覆過濾和冪等性。

這一篇內容有點多,不知道有沒有表達清楚,有問題歡迎評論交流,如有不對之處還望大家指出。

下一篇計劃寫一下授權認證相關的內容。

代碼放在:https://github.com/xiajingren/NetCoreMicroserviceDemo

未完待續...


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 【一、項目目標】 獲取 QQ 音樂指定歌手單曲排行指定頁數的歌曲的歌名、專輯名、播放鏈接。 由淺入深,層層遞進,非常適合剛入門的同學練手。 【二、需要的庫】 主要涉及的庫有:requests、json、openpyxl 【三、項目實現】 1.瞭解 QQ 音樂網站的 robots 協議 只禁止播放列表 ...
  • 0. 前言 這一篇我們將介紹一下.net core 的加密和解密。在Web應用程式中,用戶的密碼會使用MD5值作為密碼數據存儲起來。而在其他的情況下,也會使用加密和解密的功能。 常見的加密演算法分為對稱加密和非對稱加密。所謂的對稱加密是指加密密鑰和解密密鑰是同一個,非對稱加密是值加密密鑰和解密迷藥不同 ...
  • 一、字元串 字元串類型是redis最基礎的數據結構,首先鍵是字元串類型,而且其他幾種結構都是在字元串類型基礎上構建的,所以字元串類型能為其他四種數據結構的學習尊定基礎。字元串類型實際上可以是字元串(簡單的字元串、複雜的字元串(xml、json)、數字(整數、浮點數)、二進位(圖片、音頻、視頻)),但 ...
  • Connection(連接對象):與數據源建立連接。 DataAdapter(適配器對象):對數據源執行操作並返回結果,在DataSet與數據源之間建立通信,將數據源中的數據寫入DataSet中,或根據DataSet中的數據綁定數據源。 DataSet(數據集對象):記憶體中的資料庫,是數據表的集合, ...
  • //判斷後臺返回數據是否沒數據,沒數據DataGrid添加一行 $(this).datagrid('appendRow', { itemid: '<div style="text-align:center;color:red">沒有數據!</div>' }).datagrid('mergeCells ...
  • 前言 在2.2里程碑中我們增加了一些新的功能,正如標題所寫通過請求頭進行導出我們不同格式的文件.下麵我們來看一下如何使用.通過這種方式無論是對我們的數據多用途,還是說對我們的數據校驗都做到了輕鬆易配。 同時我們也將在本周發佈2.3版本,另外3.0版本我們將進行一次大的性能提升。3.0版本我們將對Ra ...
  • 我們利用IIS建立網站的時候,一般都是設定好網站名稱和物理地址,直接下一步建立完成了。正常訪問都沒問題,但如果我們這時候想要更改訪問的IP或者埠號,打開了很多設置項就是沒找到設置的地方。原來它一直在右邊的那個“連接”或者叫“綁定”那裡。 ...
  • 前言 RSA加密演算法是一種非對稱加密演算法,簡單來說,就是加密時使用一個鑰匙,解密時使用另一個鑰匙。 因為加密的鑰匙是公開的,所又稱公鑰,解密的鑰匙是不公開的,所以稱為私鑰。 密鑰 關於RSA加密有很多文章,但幾乎都只介紹了RSACryptoServiceProvider類的使用方法,如果只是走走看看 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...