Message Type | Mô Tả |
---|---|
CreateNewOrder | Command dùng để tạo đơn hàng với giá trị ( Value ) |
OrderPlaced | Event báo đơn hàng đã được đặt |
OrderPaid | Event báo đơn hàng đã hoàn tất thanh toán |
OrderPlaced
và OrderPaid
sẽ được xuất bản lên RabbitMQ, do đó cần viết mapper để chuyển đổi giữa object và message sử dụng JSON serialization:public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced> { public Message MapToMessage(OrderPlaced request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-placed"; header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); }
public OrderPlaced MapToRequest(Message message) { return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!; }}
public class OrderPaidMapper : IAmAMessageMapper<OrderPaid> { public Message MapToMessage(OrderPaid request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-paid"; header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); }
public OrderPaid MapToRequest(Message message) { return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!; }}
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced> { public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value); return base.HandleAsync(command, cancellationToken); }}
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid> { public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} paid", command.OrderId); return base.HandleAsync(command, cancellationToken); }}
CreateNewOrder
mô phỏng xử lý nghiệp vụ:OrderPlaced
vào outbox.Value
chia hết cho 3 sẽ ném exception mô phỏng lỗi nghiệp vụ.OrderPaid
.IUnitOfWork
.public class CreateNewOrderHandler( IAmACommandProcessor commandProcessor, IUnitOfWork unitOfWork, ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder> {
public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default) { await unitOfWork.BeginTransactionAsync(cancellationToken); try { string id = Guid.NewGuid().ToString(); logger.LogInformation("Creating a new order: {OrderId}", id); await Task.Delay(10, cancellationToken);
_ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken);
if (command.Value % 3 == 0) { throw new InvalidOperationException("invalid value"); }
_ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken); await unitOfWork.CommitAsync(cancellationToken);
return await base.HandleAsync(command, cancellationToken); } catch { logger.LogError("Invalid data"); await unitOfWork.RollbackAsync(cancellationToken); throw; } }}
IUnitOfWork
chia sẻ transaction SQL để đảm bảo tính nguyên tử giữa lưu đơn hàng và ghi tin nhắn vào Outbox. Tin nhắn chỉ được gửi nếu transaction commit thành công.OutboxMessages
tồn tại để lưu trữ tin nhắn gửi ra.IF OBJECT_ID('OutboxMessages', 'U') IS NULLBEGIN CREATE TABLE [OutboxMessages] ( [Id] [BIGINT] NOT NULL IDENTITY, [MessageId] UNIQUEIDENTIFIER NOT NULL, [Topic] NVARCHAR(255) NULL, [MessageType] NVARCHAR(32) NULL, [Timestamp] DATETIME NULL, [CorrelationId] UNIQUEIDENTIFIER NULL, [ReplyTo] NVARCHAR(255) NULL, [ContentType] NVARCHAR(128) NULL, [Dispatched] DATETIME NULL, [HeaderBag] NTEXT NULL, [Body] NTEXT NULL, PRIMARY KEY ([Id]) );END
services .AddServiceActivator(opt => { // Cài đặt đăng ký subscription nếu cần }) .UseMsSqlOutbox(new MsSqlConfiguration(ConnectionString, "OutboxMessages"), typeof(SqlConnectionProvider), ServiceLifetime.Scoped) .UseMsSqlTransactionConnectionProvider(typeof(SqlConnectionProvider)) .UseOutboxSweeper(opt => opt.BatchSize = 10);
UseMsSqlOutbox
: Liên kết Outbox với SQL Server.UseOutboxSweeper
: Cấu hình dịch vụ kiểm tra và tái gửi tin nhắn chưa gửi thành công.IMsSqlTransactionConnectionProvider
và IUnitOfWork
, sử dụng cùng một transaction SQL.public class SqlConnectionProvider(SqlUnitOfWork sqlConnection) : IMsSqlTransactionConnectionProvider { private readonly SqlUnitOfWork _sqlConnection = sqlConnection;
public SqlConnection GetConnection() => _sqlConnection.Connection;
public Task<SqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default) => Task.FromResult(_sqlConnection.Connection);
public SqlTransaction? GetTransaction() => _sqlConnection.Transaction;
public bool HasOpenTransaction => _sqlConnection.Transaction != null;
public bool IsSharedConnection => true;}
public interface IUnitOfWork { Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable); Task CommitAsync(CancellationToken cancellationToken); Task RollbackAsync(CancellationToken cancellationToken);}
public class SqlUnitOfWork(MsSqlConfiguration configuration) : IUnitOfWork { public SqlConnection Connection { get; } = new(configuration.ConnectionString); public SqlTransaction? Transaction { get; private set; }
public async Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable) { if (Transaction == null) { if (Connection.State != ConnectionState.Open) { await Connection.OpenAsync(cancellationToken); } Transaction = Connection.BeginTransaction(isolationLevel); } }
public async Task CommitAsync(CancellationToken cancellationToken) { if (Transaction != null) { await Transaction.CommitAsync(cancellationToken); } }
public async Task RollbackAsync(CancellationToken cancellationToken) { if (Transaction != null) { await Transaction.RollbackAsync(cancellationToken); } }
public async Task<SqlCommand> CreateSqlCommandAsync(string sql, SqlParameter[] parameters, CancellationToken cancellationToken) { if (Connection.State != ConnectionState.Open) { await Connection.OpenAsync(cancellationToken); }
SqlCommand command = Connection.CreateCommand(); if (Transaction != null) { command.Transaction = Transaction; }
command.CommandText = sql; if (parameters.Length > 0) { command.Parameters.AddRange(parameters); }
return command; }}
services .AddScoped<SqlUnitOfWork, SqlUnitOfWork>() .TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqlUnitOfWork>());
OrderPlaced
và OrderPaid
được lưu trong bảng outbox rồi mới gửi ra RabbitMQ.IUnitOfWork
và IMsSqlTransactionConnectionProvider
để tránh tình trạng tin nhắn bị gửi mà dữ liệu không được lưu hoặc ngược lại.