Spring и Kafka: объедините 3 темы Kafka для создания выходных потоков Kafka - PullRequest
0 голосов
/ 15 апреля 2019

У меня есть требование присоединиться к 3 темам Кафки. Первые две темы A и B будут добавлены с использованием внутреннего соединения, так как ключ сообщения такой же, и генерирует новый поток Kafka с POJO, такой же, как B. Теперь с этим накопленным потоком мне нужно присоединиться к другой теме C, и мне нужно сгруппировать вывод основанный на поле, которое присутствует в C.

Пока у меня есть следующий подход для этого:

KStream - внутреннее соединение KStream для первых двух тем (A и B) Можно ли будет не публиковать этот накопленный вывод по какой-либо теме, но при этом использовать его ниже

KStream - KStream (выше накопленный поток и тема C)

Не могли бы вы предложить лучший подход или примеры, которые я могу посмотреть на аналогичную реализацию в Java.

1 Ответ

0 голосов
/ 15 апреля 2019

Вы можете использовать два последовательных объединения:

KStream streamAB = streamA.join(streamB, ...);
// either:
KStream streamABC = streamA.selectKey(...) // set to the key as in streamC
                           .join(streamC, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to the key as in streamAB
KStream streamABC = streamA.join(streamCnew, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to a new join key
KStream streamABC = streamA.selectKey(...) // set to a new join key
                           .join(streamC, ...);

streamABC.selectKey(/* extract grouping field and set as key */).to("outputTopic");
...