Потоки Кафки - подавить до конца окна (не закрыть) - PullRequest
0 голосов
/ 31 января 2020

Я выполняю агрегацию в оконном потоке и хочу подавить ранние результаты агрегации. Под ранними результатами я подразумеваю результаты, вычисленные до окончания окна, но не те результаты, которые были получены в течение льготного периода. Таким образом, я хотел бы подавить все результаты агрегирования с помощью отметки времени <конец окна, но переслать все записи с отметкой времени> = конец окна и отметку времени <закрыть окно. </p>

Пример топологии минимальных потоков Кафки:

new StreamsBuilder()
        .stream("my-topic")
        .windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
        .reduce(myReducer)
        .suppress( /* searched for*/ )
        .toStream();

Таким образом, Suppressed.untilWindowCloses( .. ) для меня не вариант, так как мне пришлось бы ждать, пока не истечет льготный период, который может быть долгим.

Согласно KIP-328 точно требуемое поведение может быть получено с использованием Suppressed.untilTimeLimit(Duration.ZERO, .. ) as (цитируется в описании KIP):

a. Как долго ждать новых обновлений перед отправкой. Это количество времени, измеряемое либо от времени события (для обычных KTables), либо от конца окна (для оконных KTables), чтобы буферизовать каждый ключ до их передачи в нисходящем направлении.

Однако Kafka Streams JavaDo c, а также соответствующая реализация подразумевают, что это не так, и ограничение по времени начинает обратный отсчет при получении первой записи для каждого (оконного) ключа, а не когда окно заканчивается.

Я был бы рад получить разъяснения по этому поводу и поддержать, как добиться желаемого поведения.

1 Ответ

1 голос
/ 10 февраля 2020

Неверное описание KIP (я соответственно обновил вики-страницу). Обратите внимание, что далее KIP говорит:

Обновления с ограниченной скоростью

Предположим, что мы будем sh, чтобы снизить частоту обновлений с KTable до примерно одно обновление каждые 30 секунд на ключ. Мы не хотим использовать слишком много памяти для этого, и мы не думаем, что у нас будут обновления для более чем 1000 ключей одновременно.

table
  .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
  .toStream(); // etc.

Следовательно, используя untilTimeLimit используется для излучения на регулярной основе. Для оконной агрегации интервальный таймер будет запускаться во время начала окна - вы все равно можете установить период ожидания равным «размеру окна», чтобы не получать никаких «ранних» обновлений, но вы не увидите каждое обновление после окончания окна прошло, но видят только обновления в "интервалах размера окна". Если у вас действительно длительный льготный период, он все еще может быть достаточно хорошим?

Вариант использования, который вы описываете, в настоящее время не поддерживается, но я думаю, что он довольно интересный и полезный. Может быть, вы можете создать функцию запроса билетов?

...