ShipmentCreatedEvent
và ShipmentStatusUpdatedEvent
.docker-compose.yml
tiêu chuẩn:dotnet add package MassTransit.RabbitMQ
Startup
hoặc Program
:var rabbitMqConfiguration = configuration.GetSection(nameof(RabbitMQConfiguration)).Get<RabbitMQConfiguration>()!;
services.AddMassTransit(busConfig =>{ busConfig.SetKebabCaseEndpointNameFormatter(); busConfig.UsingRabbitMq((context, cfg) => { cfg.Host(new Uri(rabbitMqConfiguration.Host), h => { h.Username(rabbitMqConfiguration.Username); h.Password(rabbitMqConfiguration.Password); }); cfg.ConfigureEndpoints(context); });});
{ "RabbitMQConfiguration": { "Host": "amqp://127.0.0.1:5672", "Username": "guest", "Password": "guest" }}
public async Task<ErrorOr<CreateShipmentResponse>> Handle(CreateShipmentCommand request, CancellationToken cancellationToken){ var shipmentAlreadyExists = await context.Shipments.Where(s => s.OrderId == request.OrderId).AnyAsync(cancellationToken); if (shipmentAlreadyExists) { logger.LogInformation("Shipment for order '{OrderId}' is already created", request.OrderId); return Error.Failure($"Shipment for order '{request.OrderId}' is already created"); }
var shipmentNumber = new Faker().Commerce.Ean8(); var shipment = CreateShipment(request, shipmentNumber); context.Shipments.Add(shipment);
var shipmentCreatedEvent = CreateShipmentCreatedEvent(shipment); await publishEndpoint.Publish(shipmentCreatedEvent, cancellationToken);
await context.SaveChangesAsync(cancellationToken); logger.LogInformation("Created shipment: {@Shipment}", shipment);
return new CreateShipmentResponse(shipment.Number);}
publishEndpoint.Publish
trước khi lưu thay đổi vào database để đảm bảo tính nhất quán sử dụng Outbox pattern.services.AddMassTransit(busConfig =>{ busConfig.AddEntityFrameworkOutbox<EfCoreDbContext>(o => { o.QueryDelay = TimeSpan.FromSeconds(30); o.UsePostgres().UseBusOutbox(); });
busConfig.SetKebabCaseEndpointNameFormatter();
busConfig.UsingRabbitMq((context, cfg) => { cfg.Host(new Uri(rabbitMqConfiguration.Host), h => { h.Username(rabbitMqConfiguration.Username); h.Password(rabbitMqConfiguration.Password); });
cfg.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(5))); cfg.ConfigureEndpoints(context); });});
ShipmentCreatedEvent
PaymentCompletedEvent
PaymentFailedEvent
ShipmentCreatedEvent
:public class ShipmentCreatedConsumer : IConsumer<ShipmentCreatedEvent>{ private readonly IPublishEndpoint publishEndpoint; private readonly ILogger<ShipmentCreatedConsumer> logger;
public ShipmentCreatedConsumer(IPublishEndpoint publishEndpoint, ILogger<ShipmentCreatedConsumer> logger) { this.publishEndpoint = publishEndpoint; this.logger = logger; }
public async Task Consume(ConsumeContext<ShipmentCreatedEvent> context) { var message = context.Message; logger.LogInformation("Received shipment created event: {@Event}", message);
// Xử lý tạo Order và publish event var orderEvent = new OrderCreatedEvent(message.OrderId, DateTime.UtcNow, 100, message.ReceiverEmail); await publishEndpoint.Publish(orderEvent); }}
dotnet add package MassTransit.Azure.ServiceBus.Core
busConfig.UsingAzureServiceBus((context, cfg) =>{ cfg.Host(azureServiceBusConfiguration.ConnectionString);
cfg.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(5))); cfg.ConfigureEndpoints(context);});
Tính năng | Mô tả |
---|---|
Message Routing | Tự động cấu hình topology cho message dựa trên kiểu dữ liệu, đơn giản hóa mô hình pub/sub |
Exception Handling | Retry policies, dead-letter queues và xử lý lỗi sẵn có |
Sagas & State Machines | Quản lý workflow phức tạp bền vững, giữ trạng thái giữa các message |
Request/Response | Hỗ trợ mô hình request/response rời rạc giữa các dịch vụ |
Inbox/Outbox Patterns | Đảm bảo idempotency , tin nhắn được gửi tối thiểu một lần |
Routing Slip Activities | Quản lý luồng công việc phân tán với khả năng bù trừ tác vụ |
Observability | Hỗ trợ OpenTelemetry để theo dõi phân tán end-to-end |
Scheduling | Lập lịch gửi tin nhắn (giống Quartz.NET, Hangfire) |
Dependency Injection | Tích hợp chặt chẽ với DI container của .NET |
Test Harness | Cho phép viết unit/integration test dễ dàng với môi trường giả lập |
Giai đoạn | Thời gian | Nội dung |
---|---|---|
Pre-release MassTransit v9 | Quý 3, 2025 | Bản thử nghiệm cho người dùng |
Phát hành chính thức v9 | Quý 1, 2026 | Chuyển sang giấy phép thương mại |
Ngừng bảo trì MassTransit v8 | Sau năm 2026 | Phiên bản mã nguồn mở đóng băng |
Loại doanh nghiệp | Mức giá |
---|---|
Doanh nghiệp vừa và nhỏ | |
Doanh nghiệp lớn | |