Mass Transit: обеспечить порядок обработки сообщений при наличии разных типов сообщений - PullRequest
2 голосов
/ 05 марта 2019

Я новичок в Mass Transit, и я хотел бы понять, может ли это помочь в моем сценарии. Я создаю пример приложения, реализованного с использованием архитектуры источника событий CQRS, и мне нужна служебная шина для отправки событий, созданных стеком команд, к денормализаторам стека запросов.

Предположим, у нас есть один агрегат в нашем домене, назовем его Фотография и два разных события домена: PhotoUploaded и PhotoArchived .

Учитывая этот сценарий, у нас есть два разных типа сообщений, и по умолчанию массовый транзит создает два разных обмена RabbitMq: один для типа сообщения PhotoUploaded и другой для типа сообщения PhotoArchived.

Предположим, у нас есть один денормализатор с именем PhotoDenormalizer : эта служба будет потребителем сообщений обоих типов, потому что модель для чтения фотографий должна обновляться всякий раз, когда фотография загружается или архивируется.

Учитывая топологию массового транзита по умолчанию, будет два разных обмена, поэтому порядок обработки сообщений не может быть гарантирован между событиями разных типов: единственная гарантия, которую мы имеем, состоит в том, что все события одного типа будут обрабатываться в порядке , но мы не можем гарантировать порядок обработки между событиями другого типа (обратите внимание, что, учитывая семантику событий в моем примере, порядок обработки имеет значение).

Как я могу справиться с таким сценарием? Подходит ли Mass Transit с моими потребностями? Я полностью упускаю момент с диспетчеризацией событий домена?

1 Ответ

6 голосов
/ 05 марта 2019

Отказ от ответственности: это не ответ на ваш вопрос, а предупреждающее сообщение, почему вы не должны делать то, что вы планируете делать.

Хотяброкеры сообщений, такие как RMQ, и библиотеки промежуточного программного обеспечения для обмена сообщениями, такие как MassTransit, идеально подходят для интеграции, поэтому я настоятельно рекомендую не использовать брокеры сообщений для поиска событий.Я могу сослаться на мой старый ответ Источник событий: когда (а не) я должен использовать Очередь сообщений? , которая объясняет причины этого.

Одна из причин, по которой вы оказались -порядок событий никогда не будет гарантирован.

Другая очевидная причина заключается в том, что построение моделей чтения из событий, опубликованных через посредник сообщений, эффективно исключает возможность воспроизведения и создания новых моделей чтения, которые должны были бы начинать обрабатывать события с начала времени, но всеони получают события, которые публикуются сейчас .

Агрегаты образуют границы транзакций, поэтому каждая команда должна гарантировать, что она завершается в одной транзакции.Хотя MT поддерживает промежуточное ПО транзакции , он гарантирует, что вы получите транзакцию только для тех зависимостей, которые их поддерживают, но не для context.Publish(@event) в теле потребителя, поскольку RMQ не поддерживает транзакции.Вы получаете хороший шанс внести изменения и не получать события на стороне чтения.Итак, практическое правило для хранилищ событий: вы должны иметь возможность подписаться на поток изменений из магазина и не публиковать события из вашего кода, если это не события интеграции, а не события домена.

Для источников событий крайне важно, чтобы каждая модель чтения сохраняла свою собственную контрольную точку в потоке событий, который она проектирует.Брокеры сообщений не дают вам такой силы, так как «контрольная точка» фактически является вашей очередью, и как только сообщение исчезнет из очереди - оно исчезнет навсегда, возвращаться не будет.

По фактическому вопросу:

Вы можете использовать конфигурацию топологии сообщений , чтобы задать одно и то же имя объекта для разных сообщений, после чего они будут опубликованы в одном и том же обмене, ноэто относится к категории «злоупотреблений», как Крис написал на этой странице.Я не пробовал это, но вы определенно можете экспериментировать.Тип CLR сообщений является частью метаданных, поэтому не должно быть проблем с десериализацией.

Но повторное размещение сообщений в одном обмене не даст вам никаких гарантий упорядочения, за исключением того факта, что все сообщения будут доставленыв одну очередь за потребляющий сервис.

Вы должны будете по крайней мере установить фильтр разделения на основе вашего идентификатора агрегата, чтобы предотвратить параллельную обработку нескольких сообщений для одного агрегата.Это, кстати, также полезно для интеграции.Вот как мы это делаем:

void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
    => ep.Handler<T>(
        c => appService.Handle(c, aggregateStore), 
        hc => hc.UsePartitioner(8, partition));

AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);
...