Как воспроизвести определенным образом c в CQRS / Event-Sourcing? - PullRequest
4 голосов
/ 04 февраля 2020

В системах на основе CQRS / ES вы храните события в хранилище событий. Эти события относятся к агрегату и имеют порядок относительно агрегата, к которому они принадлежат. Кроме того, агрегаты являются границами согласованности / транзакций, что означает, что любые транзакционные гарантии предоставляются только на уровне отдельных агрегатов.

Теперь предположим, что у меня есть модель чтения, которая использует события из , кратные агрегаты (что прекрасно, AFAIK). Чтобы иметь возможность воспроизводить модель чтения детерминированным образом c, события должны иметь своего рода глобальное упорядочение по агрегатам - в противном случае вы не знаете, воспроизводить ли события для агрегата A до или после событий для B, или как их смешивать.

Самое простое решение для достижения этой цели - использование метки времени для событий, но обычно метки времени недостаточно точны (или, другими словами, не все базы данных созданы равными). ). Другой вариант - использовать глобальную последовательность, но это плохо влияет на производительность и мешает масштабированию.

Как решить эту проблему? Или мое базовое c предположение, что повторы чтения моделей должны быть детерминированными c, не так?

Ответы [ 2 ]

2 голосов
/ 04 февраля 2020

Я вижу эти опции:

  • Глобальная последовательность

    • , если ваша база данных позволяет это сделать, вы можете использовать метку времени + aggregateId + aggregateVersion как индекс. Обычно это плохо работает в случае распределенной базы данных.

    • в распределенной базе данных вы можете использовать векторные часы , чтобы получить глобальную последовательность без блокировки.

  • Последовательность событий внутри каждой модели считывания. Вы можете в буквальном смысле сохранить все события в модели чтения и отсортировать их, как вы хотите, прежде чем применять функцию проекции.

  • Разрешить недетерминизм и справиться с ним. Например, в вашем примере, если нет группы, когда приходит событие add_user - просто создайте пустую запись группы для модели чтения и добавьте пользователя. И когда наступит событие create_group - обновите запись группы. В конце концов, вы проверили в пользовательском интерфейсе и / или обработчике команд, что существует группа с этим aggregateId, верно?

1 голос
/ 04 февраля 2020

Как вы решаете эту проблему?

Это известная проблема, и, конечно, ни простые метки времени, ни глобальная последовательность, ни наивные методы не помогут.
Использование векторные часы со слабой отметкой времени для перечисления ваших событий и векторным курсором для их чтения. Это гарантирует некоторый стабильный детерминированный порядок c для смешивания событий между агрегатами. Это будет работать, даже если каждый поток имеет разрыв синхронизации часов, что является обычным случаем использования для кластеров базы данных, поскольку идеальная синхронизация меток времени невозможна.
Также это автоматически дает возможность беспрепятственно смешивать события чтения из хранилища событий и шины событий позже, и исключает любые блокировки базы данных между различными агрегатными событиями.

Черновик алгоритма:
1) Определите реальное количество одновременных транзакций в вашей базе данных, например, максимальное количество работников в кластере.
Поскольку каждое событие было записано только в одной транзакции в одном потоке вы можете определить его уникальный идентификатор как кортеж (thread number, thread counter), где счетчик потока - это количество транзакций, обработанных в текущем потоке.
Рассчитать слабую временную метку события как MAX(thread timestamp, aggregate timestamp), где совокупная временная метка - это временная метка последнее событие для текущего агрегата.

2) Подготовить векторный курсор для чтения событий через границу номера потока. Чтение событий из каждого потока последовательно, пока промежуток времени не превысит допустимое значение. Допустимый слабый промежуток времени - это обмен между производительностью чтения событий и сохранением собственного порядка событий.
Минимальное значение - это разность времени синхронизации потоков кластера, поэтому события поступают в порядке собственного смешанного агрегата. Максимальное значение равно бесконечности, поэтому события будут разбиты по совокупности. При использовании СУБД, такой как postgres, это значение может быть автоматически определено с помощью интеллектуального запроса SQL.

Вы можете увидеть референтную реализацию для PostgreSQL базы данных для сохранения событий и загрузка событий . Производительность сохранения событий составляет около 10000 событий в секунду для кластера 4 ГБ ОЗУ RDS Postgres.

...