Реконструировать состояние агрегации с заданного момента времени, используя CQRS и Event Sourcing - PullRequest
0 голосов
/ 01 июня 2019

Давайте предположим, что я хотел бы получить сводную проекцию на данный момент времени, и я использую CQRS и архитектуру на основе Event Sourcing.

Более того, у меня есть база данных для чтения и записи.Первый основан на событиях, и есть несколько сводных прогнозов для использования в пользовательском интерфейсе.

Мой вопрос: каков наилучший подход для восстановления агрегатного состояния с заданного момента времени в такой архитектуре и кратко, как это должно выглядеть с архитектурной точки зрения.

Примечание: Я хочу вернуть такой прогноз на сторону клиента.

Ответы [ 3 ]

1 голос
/ 02 июня 2019

Это зависит от того, ожидаете ли вы много событий на агрегат.

Если вы это сделаете, вы можете использовать снимки.Сохраните состояние агрегата в определенный момент времени.Затем вы можете прочитать этот снимок и применить все события, которые произошли после снимка.

Если вы не ожидаете много событий, просто прочитайте все события с начала потока и восстановите этот агрегат.В этом случае моментальные снимки могут сделать вещи более полными и даже замедлить процесс.

Вот некоторые ресурсы, которые вы можете проверить:

https://blog.jonathanoliver.com/event-sourcing-and-snapshots/ https://martinfowler.com/eaaDev/EventSourcing.html

0 голосов
/ 03 июня 2019

Предполагая, что количество событий для агрегата мало, вы можете ответить на запрос о начале момента, просто направив события для этого агрегата до этого времени в проекцию в памяти. Если ваша проекция опирается только на события одного агрегатного типа, все готово, просто верните проекцию. Если у вас слишком много событий для каждого агрегата, вы можете потенциально сохранять периодические снимки для каждого прогноза агрегата (например, каждые 100 событий для конкретного агрегата).

Проблема в том, что вам также может понадобиться другая информация в вашей проекции из других потоков событий. Это делает это более сложным. Варианты включают в себя:

  • Выборка событий из других определенных потоков, которые влияют на ваш конкретный экземпляр проекции, если их легко обработать, и они не слишком велики
  • Используйте Datomic в качестве хранилища событий и проекций, поскольку оно поддерживает запросы на основе времени и проекции того типа, который вам может понадобиться изначально
  • Сохраните проекцию в базе данных, используя временные функции (SQL: 2011?) Или внедрите функцию времени вручную - при изменении «строки» вместо ее обновления добавьте новую с полем для временного диапазона, в котором она находится действителен с.
    • Это делает запрос очень простым и быстрым
    • Но требует больше памяти - вместо хранилища документов или чего-то подобного вам может понадобиться использовать нормализованную структуру базы данных для проекции с совокупным разбивкой по нескольким временным таблицам, чтобы избежать необходимости дублировать слишком много для каждого изменения в проекции.
0 голосов
/ 03 июня 2019

если у вас есть архитектура Event Sourcing, это означает, что вы уже храните свое событие в хранилище событий «только для добавления». С точки зрения архитектуры вам необходимо десериализовать события и повторно применить их (в проекции памяти), чтобы восстановить состояние вашего агрегата.

Интерфейс EventStore будет выглядеть примерно так:

public interface EventStore {

    public void appendWith(EventStreamId aStartingIdentity, List<DomainEvent> anEvents);

    public void close();

    public EventStream eventStreamSince(EventStreamId anIdentity);

    public EventStream fullEventStreamFor(EventStreamId anIdentity);
}

Затем в своем хранилище вы передаете весь поток событий в агрегат, который будет отвечать за применение проекции в памяти:

public class EventStoreForumRepository
    extends EventStoreProvider
    implements ForumRepository {

    @Override
    public Forum forumOfId(Tenant aTenant, ForumId aForumId) {
        // snapshots not currently supported; always use version 1

        EventStreamId eventId = new EventStreamId(aTenant.id(), aForumId.id());

        EventStream eventStream = this.eventStore().eventStreamSince(eventId);

        Forum forum = new Forum(eventStream.events(), eventStream.version());

        return forum;
    }
}

Тогда совокупная часть:

public abstract class EventSourcedRootEntity {

    private List<DomainEvent> mutatingEvents;
    private int unmutatedVersion;

    public int mutatedVersion() {
        return this.unmutatedVersion() + 1;
    }

    public List<DomainEvent> mutatingEvents() {
        return this.mutatingEvents;
    }

    public int unmutatedVersion() {
        return this.unmutatedVersion;
    }

    protected EventSourcedRootEntity(List<DomainEvent> anEventStream, int aStreamVersion) {

        for (DomainEvent event : anEventStream) {
            this.mutateWhen(event);
        }

        this.setUnmutatedVersion(aStreamVersion);
    }
}

Ваш агрегат должен расширять EventSourcedRootEntity, а EventStore должен манипулировать mutatingEvents при сохранении (сохраняя только новые).

Образцы написаны на Java и взяты из репозитория Вона Вернона, автора книги «Реализация доменного дизайна (IDDD)».

https://github.com/VaughnVernon/IDDD_Samples

...