Можно ли условно приостановить и возобновить Kafka Stream? - PullRequest
0 голосов
/ 15 июня 2019

У меня есть требование, как указано @ https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results для ожидания закрытия окна, чтобы обработать событие позднего выхода из строя путем буферизации его на время действия окна.

В моем понимании этогоКогда окно создано, окно работает подобно обработке настенных часов, например, Создание окна за 1 час. Окно начинает тикать, как только приходит первое событие.Это 1-часовое окно закрывается ровно через час, и все буферизованные события будут перенаправлены в нисходящий поток.Тем не менее, мне нужно иметь возможность удерживать это окно даже дольше условно, пока это требуется, например, на основе состояния / информации во внешней системе, такой как база данных.

Чтобы быть точным, мое требование для пересылки событий (окна 1 час, если запись внешнего состояния говорит, что это хорошо) или (удерживайте столько, сколько требуется, пока внешняя запись не скажет, что это хорошо, и возобновите отслеживание события дособытие должно быть полностью 1 ч, не учитывая время, когда внешняя система не годится). Чтобы уточнить это 2-е условие, например, если продолжительность моего окна 1 1 ч, мое событие начинается в 00:00, если в 00:30 оно не работает и возвращается в нормальное состояние.в 00:45 окно должно быть продлено до 01: 15.

Можно ли приостановить и возобновить переадресацию событий, условно основываясь на моем требовании выше?Нужно ли использовать преобразование / процессор и использовать хранилище значений вручную, чтобы отслеживать первое время обработки моего события и условно пересылать буферизованные события в пунктуатор?

Я ценю все виды обходного пути и предложения для этого требования.

1 Ответ

0 голосов
/ 16 июня 2019

окно работает как обработка настенных часов

Нет.Потоки Kafka работают с временем-событием, следовательно, временные метки, возвращаемые из TimestampExtractor (по умолчанию встроенная временная метка записи), используются для опережения времени.

Чтобы быть точным, мое требование к пересылке событийis (окна в течение 1 часа, если внешняя запись состояния говорит, что это хорошо)

Для этого потребуется специальное решение IMHO.

или (удерживайте столько, сколько требуется довнешняя запись говорит, что это хорошо, и возобновите отслеживание события до тех пор, пока оно не станет полностью 1 ч, не учитывая время, когда внешняя система не работает)

Не 100%, если я понимаю эту часть.

Можно ли приостановить и возобновить переадресацию событий условно на основании моего требования выше?

Нет.

Нужно ли мне использовать преобразование / процессор и использовать хранилище значений вручную, чтобы отслеживать первое время обработки моего события и условно пересылать буферизованные события в пунктуатор?

Я думаю, что это может потребоваться.

Ознакомьтесь с этим сообщением в блоге, в котором подробно объясняется, как suppress() работает, и когда оно генерируется на основе наблюдаемого времени события: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

...