Я работаю над Inventory Microservice, где применяется CQRS. У нас есть запас, который можно зарезервировать с определенным идентификатором резервирования. Таким образом, у 1 StockLine может быть только 1 резервирование с таким же идентификатором бронирования. Если сделано 2 резервирования с одинаковым ID резервирования, зарезервированное количество увеличивается для этого резервирования. На стороне чтения, хотя я должен отобразить в той же таблице доступный запас + все резервирования в виде отдельных строк:
Теперь я реализовал оптимистичный параллелизм для модели чтения. И это прекрасно работает. Проверьте код:
public class Handler
{
public async Task Handle(StockReserved @event)
{
var optionsBuilder = new DbContextOptionsBuilder<StockLinesDbContext>();
optionsBuilder.UseSqlServer("Data Source=localhost;Initial Catalog=DBName;Integrated Security=True;");
using (var context = new StockLinesDbContext(optionsBuilder.Options))
{
var available = context.StockLines.Single(p => !p.IsReserved);
var existingReservation = context.StockLines.SingleOrDefault(p => p.Id == @event.ReservationId);
if (existingReservation != null)
{
existingReservation.Quantity += @event.Amount;
}
else
{
var res = new StockLine
{
Id = Guid.NewGuid(),
Quantity = @event.Amount,
IsReserved = true,
ReservationId = @event.ReservationId,
Product = "coke"
};
context.StockLines.Add(res);
}
available.Quantity -= @event.Amount;
var saved = false;
while (!saved)
{
try
{
await context.SaveChangesAsync();
saved = true;
}
catch (DbUpdateConcurrencyException ex)
{
foreach (var entry in ex.Entries)
{
if (entry.Entity is StockLine && !((StockLine)entry.Entity).IsReserved)
{
var proposedValues = entry.CurrentValues;
var databaseValues = (StockLine)entry.GetDatabaseValues().ToObject();
((StockLine)entry.Entity).Quantity = databaseValues.Quantity - @event.Amount;
// Refresh original values to bypass next concurrency check
entry.OriginalValues.SetValues(databaseValues);
}
if (entry.Entity is StockLine && ((StockLine)entry.Entity).IsReserved)
{
var proposedValues = entry.CurrentValues;
var databaseValues = (StockLine)entry.GetDatabaseValues().ToObject();
((StockLine)entry.Entity).Quantity = databaseValues.Quantity + @event.Amount;
// Refresh original values to bypass next concurrency check
entry.OriginalValues.SetValues(databaseValues);
}
}
}
}
}
}
}
Теперь я добавил поддержку идемпотентности для этого обработчика. Идея состоит в том, чтобы в той же транзакции попытаться добавить eventId в отдельную таблицу, и если эта вставка завершится неудачно, то есть событие уже обработано.
using (var transaction = connection.BeginTransaction())
{
// Run raw ADO.NET command in the transaction
var command = connection.CreateCommand();
command.Transaction = transaction;
command.CommandText =
$"Insert INTO dbo.ReadModel_Idempotency VALUES ('{ReadModelName}', '{@event.EventId}')";
try
{
command.ExecuteNonQuery();
}
catch (Exception ex)
{
Console.WriteLine($"FAILED TO INSERT: {@event.EventId}");
return;
}
После того, как я добавил поддержку идемпотентности, оптимистический параллелизм не работает должным образом, остается много данных о несоответствиях. Полный код можно найти здесь: https://github.com/dmitribodiu/OptimisticConcurrencyTest Есть идеи, почему он не работает после того, как я добавил реализацию Idempotency?