Правильный способ реализации оптимистичного параллелизма с NEventStore - PullRequest
0 голосов
/ 02 июня 2019

Это вопрос новичка, потому что у меня нет опыта использования NEventStore, и я пробую его.

Суть этого вопроса связана с концепцией оптимистической проверки параллелизма, как это предусмотрено Грегом Янгом в этом документе , для которого приведен практический пример здесь .

Итак, в моем приложении у меня есть следующий интерфейс, который является абстракцией хранилища событий, которую я собираюсь использовать в реализации репозитория:

public interface IEventStore
{
    void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion);
    List<Event> GetEventsForAggregate(Guid aggregateId);
}

Моя цель - обеспечить реализацию IEventStore с помощью библиотеки NEventStore. Это наивная реализация, где я выполняю своего рода оптимистическую проверку параллелизма для бедного человека :

public class EventStore : IEventStore 
{
    private readonly IEventStream stream;

    public EventStore(IEventStream stream)
    {
        this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
    }

    public List<Event> GetEventsForAggregate(Guid aggregateId)
    {
        // implementation omitted because I'm only interested in understanding how to imeplement save method right now...
    }

    public void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion)
    {
        using (var stream = this.store.OpenStream(aggregateId, 0, int.MaxValue))
        {

            // here, by following Greg Young's paper, I should perform the optimistic concurrency check...
            int currentAggregateRevision = stream.StreamRevision;
            bool isNewStream = currentAggregateRevision == 0;

            if(!isNewStream && expectedVersion != currentAggregateRevision)
            {
            // DANGER: optimistic concurrency check failed !
            throw new ConcurrencyException("The guy that issued the command did not work on the latest version of the aggregate. We cannot commit these events.")
            }

                    foreach(var @event in events)
            {
            stream.Add(new EventMessage { Body = @event });  
            }

            stream.CommitChanges(Guid.NewGuid()); // Is there a best practice to generate a commit id ? Is it ok to use a new guid ?
        }
    }
}

Вот мои вопросы:

  • есть ли способ избежать ручной проверки ожидаемого агрегата версия перед сохранением нового коммита, используя некоторые возможности NEventStore?

  • Я предполагаю, что NEventStore может блокировать поток, пока ожидаемая агрегатная версия проверяется, так что если другой поток или узел записи в тот же поток между вызовом OpenStream и CommitChanges a ConcurrencyException повышается, когда CommitChanges называется. Это предположение верно?

ОБНОВЛЕНИЕ 3 ИЮНЯ 2019

Для всех читателей, интересующихся этой темой, я задал один и тот же вопрос на NEventStore github repo . Посмотрите на вопрос, который я открыл , если вы заинтересованы в обсуждении.

ОБНОВЛЕНИЕ 7 ИЮНЯ 2019

Вот окончательная версия кода, которую я придумал, следуя советам, полученным при выпуске github:

public class EventStore : IEventStore 
{
    private readonly IEventStream stream;

    public EventStore(IEventStream stream)
    {
        this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
    }

    public List<Event> GetEventsForAggregate(Guid aggregateId)
    {
        // implementation omitted because I'm only interested in understanding how to imeplement save method right now...
    }

    public void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion)
    {
        if(expectedVersion < 0)
        {
          throws new ArgumentOutOfRangeException(nameof(expectedVersion));
        }

        bool isNewStream = expectedVersion == 0;

        if(isNewStream) {

          using (var stream = this.store.CreateStream(aggregateId))
          {
            foreach(var @event in events)
            {
                stream.Add(new EventMessage { Body = @event });  
            }

            stream.CommitChanges(Guid.NewGuid()); // throws ConcurrencyException if an aggregate with the same id already exists... 
          }

        } else {

          using (var stream = this.store.OpenStream(aggregateId, 0, expectedVersion))
          {
            foreach(var @event in events)
            {
                stream.Add(new EventMessage { Body = @event });  
            }

            stream.CommitChanges(Guid.NewGuid()); // throws ConcurrencyException in case of concurrency issues...
          }

        }
    }
}
...