У меня в приложении обрабатывается поток Kafka:
myStream
.mapValues(customTransformer::transform)
.groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
.windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
.aggregate(CustomCollectorObject::new,
(key, value, aggregate) -> aggregate.collect(value),
Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
.withValueSerde(new CustomCollectorSerde()))
.toStream()
.foreach((k, v) -> /* do something very important */);
Ожидаемое поведение: входящие сообщения группируются по ключу и в течение некоторого интервала времени агрегируются в CustomCollectorObject
.CustomCollectorObject
это просто класс с List
внутри.Через каждые 10 секунд в foreach
я делаю что-то очень важное с моими агрегированными данными.Что очень важно, я ожидаю, что foreach
вызывается каждые 10 секунд!
Фактическое поведение: Я вижу, что обработка в моем foreach
вызывается реже, примерно каждые 30-35секунды, это не имеет большого значения.Что очень важно, я получаю 3-4 сообщения одновременно.
Вопрос: как мне достичь ожидаемого поведения?Мне нужно, чтобы мои данные обрабатывались во время выполнения без каких-либо задержек.
Я пытался установить cache.max.bytes.buffering: 0
, но в этом случае управление окнами вообще не работает.