Я пишу приложение Kafka Streams.Он выполняет следующие шаги: «1) использует входные данные 2) выполняет дедупликацию записи на основе нового ключа в 1-часовом окне 3) повторно выбирает ключ 4) считает ключ в 1-часовом окне 5) отправляет в нисходящий поток.
Я новичок в Kafka Streams. Насколько я понимаю, чтобы окно оставалось равным 1 часу, я также установил commit.interval.ms
равным 1 часу. Правильно ли это делать?
Как только яразвернуть мое приложение с реальным трафиком, кажется, что приложение продолжает отправлять сообщения, хотя я думал, что оно будет отправлять только кучу сообщений каждый час?
Любая помощь приветствуется !!
Моя конфигурация:
commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000
cache.max.bytes.buffering = 10485760
// dedupe by new key per window(1hr)
stream = inputStream
.selectKey(... )
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
// only keep the latest event for each customized key
.reduce((event1, event2) -> event2)
.toStream()
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
.reduce((event1, event2) -> {
long count1 = event1.getCount();
long count2 = event2.getCount();
event2.setCount(count1 + count2);
return event2;
})
.toStream()
.to(OUTPUT_TOPIC);