Кафка: обновить ключ, когда нет обновления в течение x времени - PullRequest
0 голосов
/ 23 октября 2019

Есть ли способ при использовании Kafka обновлять ключ после того, как он не был просмотрен в течение x раз?

Что-то вроде

records
    .groupByKey
    .windowedBy(
         TimeWindows
         .of(Duration.ofMinutes(5))
         .grace(Duration.ofMinutes(1))
         .advanceBy(Duration.ofMinutes(1))
    ).count()
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
    ).updateNotSeen(Duration.ofMinutes(30), (k) => (k, 0))

Так что здесь, Кафка испускает новую запись всякий раз, когда она не видела записи через 30 минут. (Сделано гипотетическим обновлением NotSeen.)

В моем поиске я обнаружил этот открытый выпуск, который, если он там был, позволил мне сделать это каким-то образом, но я нене знаю, как я буду делать это сейчас.

1 Ответ

3 голосов
/ 24 октября 2019

Насколько я знаю, это невозможно в DSL (Java, Scala).

Пока такая функциональность не предоставляется "из коробки", вы можете реализовать такую ​​пользовательскую функцию самостоятельно, используя API процессора Kafka Streams, однако. (Аналогичным образом можно использовать API-интерфейс процессора для реализации пользовательских операций объединения.)В этом случае вы не будете работать с таблицами - которые являются абстракцией только для DSL - но с хранилищами состояний (таблицы поддерживаются хранилищами состояний, fwiw), которые поддерживают прямой доступ для чтения и записи изприлагается Processor с или Transformer с. Процессоры и преобразователи поддерживают пунктуацию для планирования периодических действий, аналогично cron. Во время такого запланированного действия вы можете проверить, не видела ли какая-либо запись, идентифицированная ее ключом записи, обновление за последние 30 минут, а затем действовать соответствующим образом.

Кроме того, очень полезно знать, что вы можете объединяет Processor API и DSL (который вы использовали до сих пор). То есть вы можете продолжать использовать DSL для большей части своего кода и только «подключать» вышеупомянутые процессоры / трансформаторы (из Processor API), когда и где это необходимо.

Надеюсь, это поможет!

...