Номер версии в агрегате источника событий? - PullRequest
0 голосов
/ 22 февраля 2019

Я строю микросервисы.Один из моих MicroService использует CQRS и Event Sourcing.События интеграции генерируются в системе, и я сохраняю свои агрегаты в хранилище событий, а также обновляю мою модель чтения.

Мои вопросы: зачем нам нужна версия в агрегате, когда мы обновляем поток событий для этого агрегата?Я прочитал, что нам нужно это для согласованности, и события должны воспроизводиться последовательно, и нам нужно проверить версию перед сохранением (https://blog.leifbattermann.de/2017/04/21/12-things-you-should-know-about-event-sourcing/) Я все еще не могу разобраться с этим, поскольку события вызываются и сохраняются по порядку, поэтомуМне действительно нужен конкретный пример, чтобы понять, какую пользу мы получаем от версии и почему мы даже нуждаемся в ней.

Большое спасибо,

Имран

Ответы [ 2 ]

0 голосов
/ 22 февраля 2019

Позвольте мне описать случай, когда полезны агрегатные версии:

В нашей reSove Framework агрегатная версия используется для оптимистичного управления параллелизмом.

Я объясню это на примере.Допустим, InventoryItem агрегат принимает команды AddItems и OrderItems.AddItems увеличивает количество товаров на складе, OrderItems - уменьшает.Предположим, у вас есть InventoryItem агрегат # 123 с одним событием - ITEMS_ADDED с количеством 5. Агрегат # 123 сообщает, что на складе 5 товаров.

Итак, ваш пользовательский интерфейс показывает пользователям, что их 5товары на складеПользователь А решает заказать 3 предмета, пользователь Б - 4 предмета.Обе команды OrederItems выдают почти одновременно, скажем, пользователь А сначала на пару миллисекунд.

Теперь, если у вас есть один экземпляр агрегата # 123 в памяти, в одном потоке у вас нет проблем - первая команда от пользователя A будет успешной, событие будет применено, состояние скажет количестворавно 2, поэтому вторая команда от пользователя B не будет выполнена.

В распределенной или безсерверной системе, где команды от A и B будут находиться в отдельных процессах, обе команды будут успешными и приведут агрегат в неправильное состояние, если мы не используем какой-либо контроль параллелизма.Есть несколько способов сделать это - пессимистическая блокировка, очередь команд, агрегатное хранилище или оптимистическая блокировка.

Оптимистическая блокировка представляется наиболее простым и практичным решением:

Мы говорим, что когда-либо агрегат имеет версию - число событий в своем потоке.Таким образом, наш агрегат # 123 имеет версию 1.

Когда агрегат генерирует событие, эти данные события имеют агрегатную версию.В нашем случае ITEMS_ORDERED события от пользователей A и B будут иметь агрегированную версию событий 2. Очевидно, что у агрегированных событий должны быть версии, чтобы последовательно увеличиваться.Поэтому нам нужно просто установить ограничение базы данных, чтобы кортеж {aggregateId, aggregateVersion} был уникальным при записи в хранилище событий.

Давайте посмотрим, как наш пример будет работать в распределенной системе с оптимистичным управлением параллелизмом:

  • Пользователь A выдает команду OrderItem для агрегата # 123
  • Агрегат # 123 восстанавливается из событий {version 1, quantity 5}
  • Пользователь B выдает команду OrderItem дляaggregate # 123
  • Еще один экземпляр Aggregate # 123 восстанавливается из событий (версия 1, количество 5)
  • Экземпляр агрегата для пользователя A выполняет команду, успешно, событие ITEMS_ORDERED {aggregateId 123, version 2} isзаписывается в хранилище событий.
  • Экземпляр агрегата для пользователя B выполняет команду, успешно, событие ITEMS_ORDERED {aggregateId 123, version 2} пытается записать ее в хранилище событий и fais с исключением параллелизма.

  • В таком обработчике исключений команда для пользователя B просто повторяет всю процедуру - тогда Агрегат # 123 будет в состоянии {version 2, quantity 2}, и команда будет выполнена правильно.

Надеюсь, это прояснит ситуацию, когда используются агрегатные версии.

0 голосов
/ 22 февраля 2019

Да, это правильно.Вам нужна версия или порядковый номер для согласованности.

Две вещи, которые вы хотите:

  1. Правильный порядок
    Обычно события имеют идемпотентный характер, поскольку в распределенной системе идемпотентные сообщения или событиялегче иметь дело сИдемпотентные сообщения - это те, которые даже при многократном применении дадут один и тот же результат.Обновление регистра с фиксированным значением (скажем, одним) идемпотентно, но увеличение счетчика на единицу - нет.В распределенных системах, когда A отправляет сообщение B, B подтверждает A. Но если B принимает сообщение и из-за какой-то сетевой ошибки подтверждение A теряется, A не знает, получило ли B сообщение, и поэтому отправляет сообщение.снова.Теперь B снова применяет сообщение, и если сообщение не идемпотентно, конечное состояние станет неправильным.Итак, вы хотите идемпотентное сообщение.Но если вы не сможете применить эти идемпотентные сообщения в том же порядке, в котором они были созданы, ваше состояние снова будет неправильным.Это упорядочение может быть достигнуто с помощью идентификатора версии или последовательности.Если ваше хранилище событий является СУБД, вы не можете упорядочить свои события без какого-либо аналогичного ключа сортировки.В Kafka также есть идентификатор смещения, и клиент отслеживает смещение, до которого он израсходовал

  2. Дедупликация
    Во-вторых, что если ваши сообщенияне идемпотент?Или что, если ваши сообщения являются идемпотентными, но потребитель вызывает некоторые внешние сервисы недетерминированным способом.В таких случаях вам нужна семантика точно-один раз, потому что если вы примените одно и то же сообщение дважды, ваше состояние будет неправильным.Здесь также вам нужен идентификатор версии или порядковый номер.Если на стороне потребителя вы отслеживаете идентификатор версии, который вы уже обработали, вы можете выполнить дедупликацию на основе идентификатора.В Kafka вы можете затем сохранить идентификатор смещения на стороне потребителя

Дополнительные пояснения, основанные на комментариях:

Автор рассматриваемой статьи предположил, что СУБДв качестве магазина событий.Ожидается, что идентификатор версии или последовательность событий будут сгенерированы производителем.Поэтому в вашем примере событие «доставлено» будет иметь более высокую последовательность, чем событие «в пути».

Проблема возникает, когда вы хотите обрабатывать события параллельно.Что если один потребитель получает событие «доставлено», а другой - событие «в пути»?Очевидно, вы должны убедиться, что все события определенного заказа обрабатываются одним и тем же потребителем.В Kafka вы решаете эту проблему, выбирая идентификатор заказа в качестве ключа раздела.Поскольку один раздел будет обрабатываться только одним потребителем, вы знаете, что вы всегда получите «в пути» перед доставкой.Но несколько заказов будут распределены по разным потребителям в одной и той же группе потребителей, и поэтому вы выполняете параллельную обработку.

Что касается совокупного идентификатора, я думаю, что это синонимично теме в Kafka.Поскольку автор предполагал хранение в СУБД, ему нужен некоторый идентификатор для разделения различных категорий сообщений.Вы делаете это путем создания отдельных тем в Kafka, а также групп потребителей по агрегату.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...