Я хочу частично собирать элементы в списки в потоках Кафки через определенный промежуток времени или после обработки 1000 элементов.
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> kStream = builder.stream(topicTwo);
KTable<String, String> kTable = builder.table(topicOne);
kStream.join(kTable,
(streamValue, tableValue) -> new CustomObject(streamValue, tableValue)
.foreach((key, value) -> System.out.println(value));
KafkaStreams streams = new KafkaStreams(builder, streamProperties);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Это мой код.Я не знаю, достаточно ли я прояснил себя, но я хочу получить List<CustomObject>
после каждых 1000 обработанных элементов или 5 секунд.Возможно ли это?