Я пытаюсь использовать KSQL для выполнения любой возможной обработки в течение определенного времени и получения результатов в это время.См. Своевременная (и с учетом состояния) обработка с использованием Apache Beam в разделе «Таймеры обработки» для той же идеи, проиллюстрированной с использованием Apache Beam.
Дано:
- Потоктранзакций с уникальными ключами;
- Обновление этих транзакций в одном потоке;и
- нисходящий процессор, который хочет получать обновленные транзакции в определенное время - скажем, 20 секунд - после того, как транзакции появились в первом потоке.
Концептуально я думал осоздание KTable первого потока для хранения последнего состояния транзакций и использование KSQL для создания выходного потока путем запроса KTable для ключей с (create_time + timeout)
Я не нашел способа сделать это в документации по KSQL, и даже если бы былвстроенный current_time, я не уверен, что он будет оцениваться до тех пор, пока другая запись не выйдет в поток.
Как я могу сделать это в KSQL?Нужен ли пользовательский UDF?Если это невозможно сделать в KSQL, могу ли я сделать это в KStreams?
=====
Обновление: похоже, что KStreams не поддерживает это сегодня - похоже, что Apache Flinkбыть подходящим для этого варианта использования (и многих других).Если вы знаете хитрый способ обойти ограничения KStreams, скажите мне!