У меня есть Processor-API Processor, который внутренне перенаправляет в несколько отдельных приемников (подумайте о классификаторе событий, хотя он также имеет логику состояния между событиями).Я думал о соединении позже между двумя из этих тем.После того, как соединение выполнено, я пересылаю обновленную (обогащенную) версию элементов в те темы, к которым я фактически присоединяюсь.
Как бы вы смешали DSL, если в своем коде API процессора вы перенаправляете в несколько приемников?(раковина1, раковина2) что по очереди отправляются в темы?
Полагаю, вы могли бы создать отдельные потоки, например
val stream1 = builder.stream(outputTopic)
val stream2 = builder.stream(outputTopic2)
, и создавать их оттуда?Однако это создает больше подтопологий - каковы здесь последствия?
Другая возможность состоит в том, чтобы иметь свое собственное хранилище состояний в API процессора и управлять им там, в том же процессоре (на самом деле я это делаю).Это добавляет сложности к коду, но не будет ли это более эффективным?Например, вы можете удалить данные, которые вы больше не используете (после объединения вы можете переслать новые объединенные данные в приемники, и они больше не имеют права на объединение).Любая другая эффективность Гоча?