Обратите внимание, что сообщения Kafka неизменны и не удаляются после чтения, и если вы читаете / пишете из одного и того же c топа с одним приложением, сообщение будет обрабатываться бесконечно часто (или, точнее, разные его копии. ), если у вас нет условия для «разрыва» цикла.
Кроме того, если, например, 5 сервисов читают из одного и того же topi c, все 5 сервисов получают копию каждого события. И если одна служба ответит на обратную запись, все остальные 4 службы и сама служба записи прочитают сообщение снова. Таким образом, вы получаете некоторое усиление данных.
Если у вас есть разные сервисы, которые последовательно реагируют на исходное входное сообщение, у вас может быть один топи c между каждой парой последовательных сервисов, чтобы действительно построить конвейер.
Наконец, вы говорите, что если логический флаг равен true
, вы хотите преобразовать сообщение и передать его (я предполагаю, что для следующей услуги потребителю). А для false
ничего не нужно делать. Я также предполагаю, что для сообщения только один флаг будет true
, а успешное преобразование также переключает флаг (чтобы разрешить обработку следующей службой). В этом случае лучше всего, если вы можете убедиться, что каждое исходное входное сообщение имеет один и тот же начальный логический флаг, установленный для построения вашего конвейера. Таким образом, только соответствующая служба будет читать сообщения с установленным логическим флагом (вам даже не нужно проверять логический флаг, поскольку ваша запись в восходящем потоке гарантирует, что он установлен; у вас может быть только проверка работоспособности).
Если вы не знаете, какой логический флаг установлен изначально, и все службы читают из одного и того же входа topi c, правильным будет просто отфильтровать сообщение. Если все службы читают все сообщения, 4 службы будут фильтровать сообщение, а одна служба обработает его и выдаст новое сообщение с другим флагом. Для этой архитектуры один topi c может работать : если сообщение обрабатывается всеми службами и все логические флаги ложны (после того, как все службы обработали сообщение), и вы записываете его обратно на вход topi c, все сервисы корректно сбросили бы последнюю копию. Однако использование одного topi c подразумевает большое количество избыточных операций чтения / записи.
Возможно, лучшая архитектура - иметь исходный вход topi c и один дополнительный вход topi c для каждого служба. Вы также можете использовать дополнительную службу «диспетчер», которая считывает исходные темы ввода и branches()
KStream в темы ввода службы в соответствии с логическим флагом. Таким образом, каждая служба будет читать только сообщения с правильным флагом, установленным на true
. Кроме того, каждая служба будет записывать на вход topi c других служб, также используя branch()
после преобразования сообщения, чтобы записать его во входную topi c правильной следующей службы. Наконец, вам понадобится выходной topi c, в который каждая служба сможет писать после того, как сообщение будет полностью обработано.