Как разрешить Kafka Streams отправлять одну запись на ключ в течение 1 часа? - PullRequest
2 голосов
/ 01 июля 2019

Я пишу приложение Kafka Streams.Он выполняет следующие шаги: «1) использует входные данные 2) выполняет дедупликацию записи на основе нового ключа в 1-часовом окне 3) повторно выбирает ключ 4) считает ключ в 1-часовом окне 5) отправляет в нисходящий поток.

Я новичок в Kafka Streams. Насколько я понимаю, чтобы окно оставалось равным 1 часу, я также установил commit.interval.ms равным 1 часу. Правильно ли это делать?

Как только яразвернуть мое приложение с реальным трафиком, кажется, что приложение продолжает отправлять сообщения, хотя я думал, что оно будет отправлять только кучу сообщений каждый час?

Любая помощь приветствуется !!

Моя конфигурация:

commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000    
cache.max.bytes.buffering = 10485760

// dedupe by new key per window(1hr)
 stream = inputStream
        .selectKey(... )
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        // only keep the latest event for each customized key
        .reduce((event1, event2) -> event2)
        .toStream()
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        .reduce((event1, event2) -> {
            long count1 = event1.getCount();
            long count2 = event2.getCount();
            event2.setCount(count1 + count2);
            return event2;
        })
        .toStream()
        .to(OUTPUT_TOPIC);

Ответы [ 2 ]

2 голосов
/ 06 июля 2019

Я новичок в Kafka Streams.Насколько я понимаю, чтобы окно оставалось равным 1 часу, я также установил для commit.interval.ms значение 1 час.Правильно ли это делать?

Интервал фиксации не имеет ничего общего с вашей логикой обработки.

Возможно, вы захотите взглянуть на оператор suppress().Также может помочь следующее сообщение блока:

Модель обработки Kafka Streams является непрерывной и отправляет непрерывный результатОбновления по умолчанию.Вот почему вы получаете в основном одно выходное сообщение на каждое входное сообщение, потому что обработка входного сообщения изменяет результат.

0 голосов
/ 01 июля 2019
  1. Я бы порекомендовал вам использовать единовременную гарантию, предоставляемую последней версией kafka. Используя его, вам не придется беспокоиться о дедупликации сообщений. https://www.baeldung.com/kafka-exactly-once
  2. Конфигурирование конфигурации производителя: в частности, buffer.memory & linger.ms. (Вы также можете проверить batch.size) (проверьте https://kafka.apache.org/documentation/#producerconfigs для получения дополнительной информации)
...