Я не совсем знаю API KafkaStream
, но что касается общего потокового API, у вас будет метод, который буферизует сообщения с течением времени (например, buffer
, groupedWithin
или что-то подобное), где вы можетеукажите время (и / или максимальное количество сообщений).
Тогда ваш поток будет выглядеть примерно так:
KStream stream = builder.stream("mytopic", Consumed.with(Serdes.String(), new JsonSerde<>(MyObjectEvent.class)))
.map(record -> record.value().getId()) // assuming you get a stream of records, I don't know the KafkaStreams api
.groupedWithin(Duration.ofMinutes(10)) // <-- pseudocode, search for correct method
Тогда вы получите поток, который содержит идентификаторы во времени.