Как только окно ввода в потоках Kafka с помощью Java-лямда? - PullRequest
0 голосов
/ 24 марта 2019

У меня есть входные данные, которые я получаю, используя Kafka Stream.То, что мне нужно реализовать, - это просто 5-секундное окно и вывести данные в тему Кафки.Тем не менее, я не могу сделать это с помощью лямбды.Может ли кто-нибудь помочь?

Ниже приведено то, что я написал, но я получаю ошибки:

    KTable<TimeWindowedKStream<String, String> , String> result = source.
            groupByKey().windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(5000)));

    result.to(Serdes.String(), Serdes.Long(), "outputtopic");

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

Однако для переменной результата Eclipse выдает ошибку: «Несоответствие типов: невозможно преобразовать TimeWindowedKStream вKTable, String> ".

Также при записи значения результата в другое разделение затмение выдает мне ошибку:« Метод (Serde>, Serde, String) в типе KTable, String> не применимдля аргументов (Serde, Serde, String) ".

Насколько я понимаю, управление окнами не может быть достигнуто без какой-либо агрегации.Однако я просто хочу выводить данные для каждых 5-секундных окон в другую тему вывода.

1 Ответ

1 голос
/ 24 марта 2019

"Несоответствие типов: невозможно преобразовать TimeWindowedKStream в KTable, String>".

Вы должны вызвать некоторую функцию агрегирования на TimeWindowedKStream, чтобы получить таблицу, например.count(), aggregate(...)

"Метод (Serde>, Serde, String) в типе KTable, String> не применим для аргументов (Serde, Serde, String)"

Вы не можете написать в тему, используя KTable, который у вас при первом вызове KTable::toStream().KStream возвращаемое имеет to(...) функцию.

...