Создайте поток Kafka, который возвращает список различных идентификаторов во временном интервале - PullRequest
0 голосов
/ 28 июня 2019

У меня есть поток событий Кафки объектов:

KStream<String, VehicleEventTO> stream = builder.stream("mytopic", Consumed.with(Serdes.String(), new JsonSerde<>(MyObjectEvent.class)));

Каждый ObjectEvent имеет свойство idType (Long).Мне нужно построить поток, который возвращает различные idTypes в интервал времени (например: 10 минут).Возможно ли использовать KafkaStream DSL?Я не нахожу решения.

Ответы [ 2 ]

0 голосов
/ 28 июня 2019

В зависимости от вашего варианта использования вы ищете оконную агрегацию.DSL потоков Кафки имеет TimeWindowedKStream или SessionWindowdKStream, которые должны быть в состоянии решить вашу проблему.

0 голосов
/ 28 июня 2019

Я не совсем знаю API KafkaStream, но что касается общего потокового API, у вас будет метод, который буферизует сообщения с течением времени (например, buffer, groupedWithin или что-то подобное), где вы можетеукажите время (и / или максимальное количество сообщений).

Тогда ваш поток будет выглядеть примерно так:

KStream stream = builder.stream("mytopic", Consumed.with(Serdes.String(), new JsonSerde<>(MyObjectEvent.class)))
    .map(record -> record.value().getId()) // assuming you get a stream of records, I don't know the KafkaStreams api
    .groupedWithin(Duration.ofMinutes(10)) // <-- pseudocode, search for correct method 

Тогда вы получите поток, который содержит идентификаторы во времени.

...