Кафки потоки с использованием контекста вперед от процессора, названного в API DSL - PullRequest
0 голосов
/ 14 ноября 2018

У меня есть процессор, и я хотел бы вызвать в нем context.forward (). Тем не менее, я чувствую, что мне нужно установить топическую тему для того, чтобы это действительно передавалось. Если бы я использовал Toplogy, я бы просто .addSource (), .addProcessor (), .addSink (). Однако с DSL у меня есть StreamsBuilder / KStream. Есть ли способ использовать context.forward () при вызове процессора из dsl?

ПРИМЕЧАНИЕ: мне нужно использовать процессор вместо преобразования, так как у меня есть собственная логика, когда нужно пересылать записи по потоку.

stream.process(() -> new WindowAggregatorProcessor(storeName), storeName);

1 Ответ

0 голосов
/ 14 ноября 2018

stream.process() является терминальной операцией в DSL.Вы можете использовать stream.transform() вместо этого, чтобы получить выходной поток.A Transformer в основном совпадает с Processor.

...