Как оператор подавления KStream определяет последнюю запись окна? - PullRequest
0 голосов
/ 28 октября 2019

Вот простое определение окна с оператором подавления:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .suppress(untilWindowCloses(unbounded())
  .toStream()
  // process last event here
  ... 

Итак, мой вопрос: как оператор подавления определяет, является ли событие последним событием окна? давайте представим, я удаляю оператор подавления:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .toStream()
  ... 

Я понимаю, что для каждого изменения в KTable будут сгенерированы два события:

  1. Запись с nullзначение для удаления предыдущей записи
  2. Новая запись с новым значением

Я хочу удалить оператор suppress и самостоятельно обнаружить последнюю запись:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .toStream()
  .filter( /* detect the last record here */ )

Предоставляется ли эта информация в DSL или API процессора?

1 Ответ

1 голос
/ 30 октября 2019

Информация предоставляется только косвенно. Оператор suppress() использует хранилище состояний для отслеживания ранее полученных сообщений. Это позволяет сравнивать старые / новые сообщения друг с другом и решать, когда на самом деле что-то испускать.

Обратите внимание, что filter() без состояния не может этого достичь. Если вы хотите разобраться в деталях, вам нужно прочитать исходный код.

Основной вопрос, однако: почему вы хотите удалить suppress() для начала?

...