я могу использовать KSQL для генерации тайм-аутов обработки? - PullRequest
0 голосов
/ 14 ноября 2018

Я пытаюсь использовать KSQL для выполнения любой возможной обработки в течение определенного времени и получения результатов в это время.См. Своевременная (и с учетом состояния) обработка с использованием Apache Beam в разделе «Таймеры обработки» для той же идеи, проиллюстрированной с использованием Apache Beam.

Дано:

  1. Потоктранзакций с уникальными ключами;
  2. Обновление этих транзакций в одном потоке;и
  3. нисходящий процессор, который хочет получать обновленные транзакции в определенное время - скажем, 20 секунд - после того, как транзакции появились в первом потоке.

Концептуально я думал осоздание KTable первого потока для хранения последнего состояния транзакций и использование KSQL для создания выходного потока путем запроса KTable для ключей с (create_time + timeout)

Я не нашел способа сделать это в документации по KSQL, и даже если бы былвстроенный current_time, я не уверен, что он будет оцениваться до тех пор, пока другая запись не выйдет в поток.

Как я могу сделать это в KSQL?Нужен ли пользовательский UDF?Если это невозможно сделать в KSQL, могу ли я сделать это в KStreams?

=====

Обновление: похоже, что KStreams не поддерживает это сегодня - похоже, что Apache Flinkбыть подходящим для этого варианта использования (и многих других).Если вы знаете хитрый способ обойти ограничения KStreams, скажите мне!

1 Ответ

0 голосов
/ 26 ноября 2018

Взгляните на функциональность punctuate() в Processor API Kafka Streams, которая может быть тем, что вы ищете. Вы можете использовать punctuate () со временем потока (по умолчанию: событие-время), а также со временем обработки (через PunctuationType.WALL_CLOCK_TIME). Здесь вы должны реализовать Processor или Transformer, в зависимости от ваших потребностей, которые будут использовать punctuate() для функции тайм-аута.

См. https://kafka.apache.org/documentation/streams/developer-guide/processor-api.html для получения дополнительной информации.

Совет: Вы можете использовать такой Процессор / Трансформатор также в DSL Kafka Streams. Это означает, что вы можете продолжать использовать более удобный DSL, если хотите, и вам нужно только подключить процессор / преобразователь в нужном месте вашего кода на основе DSL.

...