Kafka Streams не запускает вывод для объединенных потоков? - PullRequest
0 голосов
/ 27 апреля 2018

У меня есть необработанные потоки из 3 таблиц mysql, 1 основной и двух дочерних таблиц. Я попытался объединить три исходных потока и преобразовать их в один выходной поток. Это работает, если есть какое-либо обновление в родительском потоке, но не запускает вывод, если что-то изменяется в дочернем потоке.

    @StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

    KTable<Long, Parent> parentTable = convertParent(parentStream);
    KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
    KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

    parentTable
           .leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
           .leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
           .toStream()
        }

Любое новое добавление или обновление в родительском потоке выбирается процессором и соединяется с другим KTable и возвращает его в выходной поток. Но любое добавление или обновление child1stream или child2stream не запускает поток вывода.

Я думал, что, сделав все входные потоки как KTable, они всегда будут хранить изменения, так как все они имеют одинаковый ключ, и любые обновления родительских или дочерних таблиц будут подхвачены объединениями. Но этого не происходит, кто-нибудь может подсказать, чего мне не хватает в этом?

Я уже пробовал соединения KStream-KStream, Stream-KTable, KTable-KTable, но ни одно из них не работало в случае дочерних обновлений.

-Спасибо

Ответы [ 2 ]

0 голосов
/ 27 апреля 2018

Можете ли вы показать, где у вас есть EnableBinding и интерфейс процессора, к которому вы привязываетесь?

Мне это не подходит:

@StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

Вы не указываете привязку для входов. Вам нужно что-то подобное, когда у вас есть несколько входов:

@StreamListener
        public Stream<Long, Output> handleStreams(@Input("input1") KStream<Long, Parent> parentStream,
        @Input("input2") KStream<Long, Child1> child1Stream,
        @Input("input3") KStream<Long, Child2> child2Stream) {

Каждый из этих входов должен быть определен в интерфейсе процессора. Смотрите здесь для примера: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java#L46

0 голосов
/ 27 апреля 2018

Обратите внимание, как ваши дочерние таблицы создаются из того же потока, что и parentTable:

KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

Не уверен, что делают методы convertChild1 и convertChild2, но разве им не следует указывать соответственно child1Stream и child2Stream в качестве аргумента?

...