本文地址源碼 MassTransit是一個面向.net的免費開源分散式應用程式框架。 MassTransit使得創建應用程式和服務變得很容易,這些應用程式和服務利用基於消息的、鬆散耦合的非同步通信來獲得更高的可用性、可靠性和可伸縮性。 MassTransit 8.x版本。 實現簡單發佈訂閱 添加Nug ...
- MassTransit是一個面向.net的免費開源分散式應用程式框架。
- MassTransit使得創建應用程式和服務變得很容易,這些應用程式和服務利用基於消息的、鬆散耦合的非同步通信來獲得更高的可用性、可靠性和可伸縮性。
- MassTransit 8.x版本。
實現簡單發佈訂閱
- 添加Nuget包引用:
- MassTransit
- MassTransit.RabbitMQ(演示也可基於記憶體)
生產端
- 配置MassTransit
builder.Services.AddMassTransit(x =>
{
// 使用記憶體
//x.UsingInMemory();
// 使用RabbitMq
x.UsingRabbitMq((context, config) =>
{
config.Host("rabbitmq://localhost:5672", host =>
{
host.Username("admin");
host.Password("admin");
});
});
});
- 定義消息體
public class OrderEto
{
public Guid Id { get; init; }
public string Name { get; set; }
public DateTime CreationTime { get; set; }
}
- 發佈消息
[ApiController]
[Route("[controller]")]
public class PublishController : ControllerBase
{
private readonly ILogger<PublishController> _logger;
private readonly IPublishEndpoint _publishEndpoint;
public PublishController(ILogger<PublishController> logger, IPublishEndpoint publishEndpoint)
{
_logger = logger;
_publishEndpoint = publishEndpoint;
}
[HttpGet]
public async Task Get()
{
await _publishEndpoint.Publish<OrderEto>(new OrderEto()
{
Id = Guid.NewGuid(),
Name = "Phone",
CreationTime = DateTime.Now
});
}
}
消費者端
builder.Services.AddMassTransit(x =>
{
// 通過掃描程式集註冊消費者
x.AddConsumers(typeof(Program).Assembly);
// 通過類型單個註冊消費者
// x.AddConsumer<OrderEtoConsumer>(typeof(OrderEtoConsumerDefinition));
// x.SetKebabCaseEndpointNameFormatter();
// 通過泛型單個註冊消費者
//x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();
// 通過指定命名空間註冊消費者
// x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();
// 使用記憶體隊列
// x.UsingInMemory();
x.UsingRabbitMq((context, config) =>
{
config.Host("rabbitmq://localhost:5672", hostconfig =>
{
hostconfig.Username("admin");
hostconfig.Password("admin");
});
config.ConfigureEndpoints(context);
});
});
- 消費者定義
public class OrderEtoConsumer : IConsumer<OrderEto>
{
private readonly ILogger<OrderEtoConsumer> _logger;
public OrderEtoConsumer(ILogger<OrderEtoConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<OrderEto> context)
{
_logger.LogInformation($"MassTransit.Consumer.One 收到消息:{JsonSerializer.Serialize(context.Message)}");
return Task.CompletedTask;
}
}
public class OrderEtoConsumerDefinition : ConsumerDefinition<OrderEtoConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<OrderEtoConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));
}
}