Отказ от ответственности: это не ответ на ваш вопрос, а предупреждающее сообщение, почему вы не должны делать то, что вы планируете делать.
Хотяброкеры сообщений, такие как RMQ, и библиотеки промежуточного программного обеспечения для обмена сообщениями, такие как MassTransit, идеально подходят для интеграции, поэтому я настоятельно рекомендую не использовать брокеры сообщений для поиска событий.Я могу сослаться на мой старый ответ Источник событий: когда (а не) я должен использовать Очередь сообщений? , которая объясняет причины этого.
Одна из причин, по которой вы оказались -порядок событий никогда не будет гарантирован.
Другая очевидная причина заключается в том, что построение моделей чтения из событий, опубликованных через посредник сообщений, эффективно исключает возможность воспроизведения и создания новых моделей чтения, которые должны были бы начинать обрабатывать события с начала времени, но всеони получают события, которые публикуются сейчас .
Агрегаты образуют границы транзакций, поэтому каждая команда должна гарантировать, что она завершается в одной транзакции.Хотя MT поддерживает промежуточное ПО транзакции , он гарантирует, что вы получите транзакцию только для тех зависимостей, которые их поддерживают, но не для context.Publish(@event)
в теле потребителя, поскольку RMQ не поддерживает транзакции.Вы получаете хороший шанс внести изменения и не получать события на стороне чтения.Итак, практическое правило для хранилищ событий: вы должны иметь возможность подписаться на поток изменений из магазина и не публиковать события из вашего кода, если это не события интеграции, а не события домена.
Для источников событий крайне важно, чтобы каждая модель чтения сохраняла свою собственную контрольную точку в потоке событий, который она проектирует.Брокеры сообщений не дают вам такой силы, так как «контрольная точка» фактически является вашей очередью, и как только сообщение исчезнет из очереди - оно исчезнет навсегда, возвращаться не будет.
По фактическому вопросу:
Вы можете использовать конфигурацию топологии сообщений , чтобы задать одно и то же имя объекта для разных сообщений, после чего они будут опубликованы в одном и том же обмене, ноэто относится к категории «злоупотреблений», как Крис написал на этой странице.Я не пробовал это, но вы определенно можете экспериментировать.Тип CLR сообщений является частью метаданных, поэтому не должно быть проблем с десериализацией.
Но повторное размещение сообщений в одном обмене не даст вам никаких гарантий упорядочения, за исключением того факта, что все сообщения будут доставленыв одну очередь за потребляющий сервис.
Вы должны будете по крайней мере установить фильтр разделения на основе вашего идентификатора агрегата, чтобы предотвратить параллельную обработку нескольких сообщений для одного агрегата.Это, кстати, также полезно для интеграции.Вот как мы это делаем:
void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
=> ep.Handler<T>(
c => appService.Handle(c, aggregateStore),
hc => hc.UsePartitioner(8, partition));
AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);