Как работает политика секвенирования Axon с точки зрения состояния - PullRequest
0 голосов
/ 29 ноября 2018

В справочном руководстве Axon написано, что

Помимо этих предоставленных политик, вы можете определить свои собственные.Все политики должны реализовывать интерфейс SequencingPolicy.Этот интерфейс определяет единственный метод, getSequenceIdentifierFor, который возвращает идентификатор последовательности для данного события.События, для которых возвращается одинаковый идентификатор последовательности, должны обрабатываться последовательно.События, которые создают другой идентификатор последовательности, могут обрабатываться одновременно.

Более того, в последнем сообщении этого потока говорится, что

сВ политике последовательности вы указываете, какие события должны обрабатываться последовательно.Не имеет значения, находятся ли потоки в одной и той же JVM или в разных.Если политика последовательности возвращает одно и то же значение для 2 сообщений, они гарантированно будут обрабатываться последовательно, даже если вы отслеживаете потоки процессора в нескольких JVM.

Значит ли это, что обработчики событий на самом делебез гражданства?Если да, то как им удается синхронизироваться?Используется ли для этой цели хранилище токенов ?

1 Ответ

0 голосов
/ 30 ноября 2018

Я думаю, это зависит от того, что вы считаете состоянием, но я предполагаю, что с точки зрения вашего взгляда на это, да, реализации EventProcessor в Axon действительно не имеют состояния.

SubscribingEventProcessor получает это события от SubscribableMessageSource (EventBus реализует этот интерфейс), когда они происходят.TrackingEventProcessor извлекает это событие из StreamableMessageSource (EventStore реализует этот интерфейс) на собственном досуге.

Последняя версия для этого должна отслеживать, где онав отношении событий в потоке событий.Эта информация сохраняется в TrackingToken, который сохраняется в TokenStore.Данный TrackingEventProcessor поток может обрабатывать события только в том случае, если он предъявляет требование к TrackingToken для группы обработки, частью которой он является.Следовательно, это гарантирует, что одно и то же событие не будет обработано двумя разными потоками, чтобы случайно обновить одну и ту же модель запроса.TrackingToken также разрешает многопоточность этого процесса, который выполняется сегментированным токеном.Количество сегментов (регулируемое с помощью initialSegmentCount) приводит к разбивке на число штук , TrackingToken для данной группы обработки. С точки зрения TokenStore это означает, чтоу вас будет храниться несколько TrackingToken экземпляров, которые равны количеству сегментов, которые вы установили.

SequencingPolicy Его задача состоит в том, чтобы определить, какие события в потоке принадлежат к какому сегменту.Таким образом, вы можете, например, использовать SequentialPerAggregate SequencingPolicy, чтобы все события с данным агрегированным идентификатором обрабатывались одним сегментом.

...