Я строю конвейер обработки с NServiceBus, но у меня возникают проблемы с конфигурацией распределителей, чтобы сделать каждый шаг в процессе масштабируемым. Вот некоторая информация:
- Конвейер будет иметь главный процесс, который скажет «ОК, время начинать» для WorkItem, который затем запустит процесс, как блок-схема.
- Каждый шаг в блок-схеме может быть вычислительно дорогим, поэтому я хочу иметь возможность масштабировать каждый шаг. Это говорит мне, что для каждого шага нужен Дистрибьютор.
- Я хочу иметь возможность подключать дополнительные мероприятия к событиям позже. Это говорит мне, что мне нужно публиковать () сообщения, когда это сделано, а не отправлять () их.
- Процесс может нуждаться в ветвлении на основе условия. Это говорит мне о том, что процесс должен иметь возможность публиковать более одного типа сообщений.
- В процессе может потребоваться присоединение к вилкам. Я полагаю, что я должен использовать саги для этого.
Надеюсь, эти предположения верны, иначе у меня больше проблем, чем я думал.
Ради простоты, давайте забудем о разветвлении или присоединении и рассмотрим простой конвейер с этапом A, за которым следует шаг B, и заканчивающимся этапом C. Каждый шаг получает своего собственного распределителя и может иметь много узлов, обрабатывающих сообщения.
- Рабочие NodeA содержат процессор IHandleMessages и публикуют EventA
- Рабочие NodeB содержат процессор IHandleMessages и публикуют событие B
- Рабочие NodeC содержат процессор IHandleMessages, и затем конвейер завершается.
Вот соответствующие части файлов конфигурации, где # обозначает номер рабочего (то есть есть входные очереди NodeA.1 и NodeA.2):
NodeA:
<MsmqTransportConfig InputQueue="NodeA.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeA.Distrib.Control" DistributorDataAddress="NodeA.Distrib.Data" >
<MessageEndpointMappings>
</MessageEndpointMappings>
</UnicastBusConfig>
NodeB:
<MsmqTransportConfig InputQueue="NodeB.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeB.Distrib.Control" DistributorDataAddress="NodeB.Distrib.Data" >
<MessageEndpointMappings>
<add Messages="Messages.EventA, Messages" Endpoint="NodeA.Distrib.Data" />
</MessageEndpointMappings>
</UnicastBusConfig>
NodeC:
<MsmqTransportConfig InputQueue="NodeC.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeC.Distrib.Control" DistributorDataAddress="NodeC.Distrib.Data" >
<MessageEndpointMappings>
<add Messages="Messages.EventB, Messages" Endpoint="NodeB.Distrib.Data" />
</MessageEndpointMappings>
</UnicastBusConfig>
А вот соответствующие части конфигов дистрибьютора:
Distributor A:
<add key="DataInputQueue" value="NodeA.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeA.Distrib.Control"/>
<add key="StorageQueue" value="NodeA.Distrib.Storage"/>
Distributor B:
<add key="DataInputQueue" value="NodeB.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeB.Distrib.Control"/>
<add key="StorageQueue" value="NodeB.Distrib.Storage"/>
Distributor C:
<add key="DataInputQueue" value="NodeC.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeC.Distrib.Control"/>
<add key="StorageQueue" value="NodeC.Distrib.Storage"/>
Я тестирую с использованием 2 экземпляров каждого узла, и проблема, похоже, возникает в середине на узле B. В основном могут произойти 2 вещи:
- Оба экземпляра Узла B сообщают, что он подписывается на EventA, а также что NodeC.Distrib.Data@MYCOMPUTER подписывается на EventB, который публикует Узел B. В этом случае все отлично работает.
- Оба экземпляра узла B сообщают, что он подписывается на EventA, однако один работник говорит, что NodeC.Distrib.Data@MYCOMPUTER подписывается ДВАЖДЫ, а другой работник не упоминает об этом.
Во втором случае, который, по-видимому, контролируется только тем, как распространитель направляет сообщения подписки, если узел «переизбытка» обрабатывает EventA, все в порядке. Если «underachiever» обрабатывает EventA, то публикация EventB не имеет подписчиков, и рабочий процесс прекращается.
Итак, мои вопросы:
- Возможна ли такая установка?
- Корректна ли конфигурация? Трудно найти какие-либо примеры конфигурации с дистрибьюторами, кроме простой одноуровневой настройки издателя / 2-работника.
- Разумнее было бы иметь один центральный посреднический процесс, который выполняет все операции, не требующие большого количества вычислительных операций, и отправляет сообщения процессам, находящимся за дистрибьюторами, только когда задача выполняется долго и должна быть сбалансирована по нагрузке?
- Тогда узлы с балансировкой нагрузки могли бы просто ответить центральному посреднику, что кажется проще.
- С другой стороны, это противоречит децентрализации, являющейся преимуществом NServiceBus.
- И если это ответ, а событие done долгосрочного процесса - ответ, как сохранить публикацию, обеспечивающую последующую расширяемость опубликованных событий?