Пакетная агрегация оконных потоков Kafka Streams - PullRequest
0 голосов
/ 24 сентября 2018

У меня в приложении обрабатывается поток Kafka:

myStream
    .mapValues(customTransformer::transform)
    .groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
    .windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
    .aggregate(CustomCollectorObject::new,
            (key, value, aggregate) -> aggregate.collect(value),
            Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
                    .withValueSerde(new CustomCollectorSerde()))
    .toStream()
    .foreach((k, v) -> /* do something very important */);

Ожидаемое поведение: входящие сообщения группируются по ключу и в течение некоторого интервала времени агрегируются в CustomCollectorObject.CustomCollectorObject это просто класс с List внутри.Через каждые 10 секунд в foreach я делаю что-то очень важное с моими агрегированными данными.Что очень важно, я ожидаю, что foreach вызывается каждые 10 секунд!

Фактическое поведение: Я вижу, что обработка в моем foreach вызывается реже, примерно каждые 30-35секунды, это не имеет большого значения.Что очень важно, я получаю 3-4 сообщения одновременно.

Вопрос: как мне достичь ожидаемого поведения?Мне нужно, чтобы мои данные обрабатывались во время выполнения без каких-либо задержек.

Я пытался установить cache.max.bytes.buffering: 0, но в этом случае управление окнами вообще не работает.

1 Ответ

0 голосов
/ 26 сентября 2018

Kafka Streams имеет другую модель исполнения и предоставляет другую семантику, т. Е. Ваши ожидания не соответствуют тому, что делает Kafka Streams.Уже есть несколько похожих вопросов:

Также обратите внимание, что в настоящее время сообщество работает над новым оператором под названием suppress(), который сможет предоставить необходимую семантику: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

На данный момент вам нужно добавить transform() с хранилищем состояний и использовать знаки препинания, чтобы получить необходимую семантику (см. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a-stream-processor)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...