Это вопрос новичка, потому что у меня нет опыта использования NEventStore, и я пробую его.
Суть этого вопроса связана с концепцией оптимистической проверки параллелизма, как это предусмотрено Грегом Янгом в этом документе , для которого приведен практический пример здесь .
Итак, в моем приложении у меня есть следующий интерфейс, который является абстракцией хранилища событий, которую я собираюсь использовать в реализации репозитория:
public interface IEventStore
{
void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion);
List<Event> GetEventsForAggregate(Guid aggregateId);
}
Моя цель - обеспечить реализацию IEventStore
с помощью библиотеки NEventStore. Это наивная реализация, где я выполняю своего рода оптимистическую проверку параллелизма для бедного человека :
public class EventStore : IEventStore
{
private readonly IEventStream stream;
public EventStore(IEventStream stream)
{
this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
}
public List<Event> GetEventsForAggregate(Guid aggregateId)
{
// implementation omitted because I'm only interested in understanding how to imeplement save method right now...
}
public void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion)
{
using (var stream = this.store.OpenStream(aggregateId, 0, int.MaxValue))
{
// here, by following Greg Young's paper, I should perform the optimistic concurrency check...
int currentAggregateRevision = stream.StreamRevision;
bool isNewStream = currentAggregateRevision == 0;
if(!isNewStream && expectedVersion != currentAggregateRevision)
{
// DANGER: optimistic concurrency check failed !
throw new ConcurrencyException("The guy that issued the command did not work on the latest version of the aggregate. We cannot commit these events.")
}
foreach(var @event in events)
{
stream.Add(new EventMessage { Body = @event });
}
stream.CommitChanges(Guid.NewGuid()); // Is there a best practice to generate a commit id ? Is it ok to use a new guid ?
}
}
}
Вот мои вопросы:
есть ли способ избежать ручной проверки ожидаемого агрегата
версия перед сохранением нового коммита, используя некоторые возможности
NEventStore?
Я предполагаю, что NEventStore может блокировать поток, пока
ожидаемая агрегатная версия проверяется, так что если другой поток
или узел записи в тот же поток между вызовом OpenStream
и
CommitChanges
a ConcurrencyException
повышается, когда
CommitChanges
называется. Это предположение верно?
ОБНОВЛЕНИЕ 3 ИЮНЯ 2019
Для всех читателей, интересующихся этой темой, я задал один и тот же вопрос на NEventStore github repo . Посмотрите на вопрос, который я открыл , если вы заинтересованы в обсуждении.
ОБНОВЛЕНИЕ 7 ИЮНЯ 2019
Вот окончательная версия кода, которую я придумал, следуя советам, полученным при выпуске github:
public class EventStore : IEventStore
{
private readonly IEventStream stream;
public EventStore(IEventStream stream)
{
this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
}
public List<Event> GetEventsForAggregate(Guid aggregateId)
{
// implementation omitted because I'm only interested in understanding how to imeplement save method right now...
}
public void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion)
{
if(expectedVersion < 0)
{
throws new ArgumentOutOfRangeException(nameof(expectedVersion));
}
bool isNewStream = expectedVersion == 0;
if(isNewStream) {
using (var stream = this.store.CreateStream(aggregateId))
{
foreach(var @event in events)
{
stream.Add(new EventMessage { Body = @event });
}
stream.CommitChanges(Guid.NewGuid()); // throws ConcurrencyException if an aggregate with the same id already exists...
}
} else {
using (var stream = this.store.OpenStream(aggregateId, 0, expectedVersion))
{
foreach(var @event in events)
{
stream.Add(new EventMessage { Body = @event });
}
stream.CommitChanges(Guid.NewGuid()); // throws ConcurrencyException in case of concurrency issues...
}
}
}
}