У меня есть необработанные потоки из 3 таблиц mysql, 1 основной и двух дочерних таблиц. Я попытался объединить три исходных потока и преобразовать их в один выходной поток. Это работает, если есть какое-либо обновление в родительском потоке, но не запускает вывод, если что-то изменяется в дочернем потоке.
@StreamListener
public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
@Input KStream<Long, Child1> child1Stream,
@Input KStream<Long, Child2> child2Stream) {
KTable<Long, Parent> parentTable = convertParent(parentStream);
KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);
parentTable
.leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
.leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
.toStream()
}
Любое новое добавление или обновление в родительском потоке выбирается процессором и соединяется с другим KTable и возвращает его в выходной поток. Но любое добавление или обновление child1stream или child2stream не запускает поток вывода.
Я думал, что, сделав все входные потоки как KTable, они всегда будут хранить изменения, так как все они имеют одинаковый ключ, и любые обновления родительских или дочерних таблиц будут подхвачены объединениями. Но этого не происходит, кто-нибудь может подсказать, чего мне не хватает в этом?
Я уже пробовал соединения KStream-KStream, Stream-KTable, KTable-KTable, но ни одно из них не работало в случае дочерних обновлений.
-Спасибо