Можно ли установить таймаут выполнения в процессоре Kafka? - PullRequest
1 голос
/ 25 февраля 2020

Я использую Kafka Streams для обработки некоторых входных данных, поступающих из события, и я хотел бы выполнить различные функции с булевыми возвращениями для этих данных. Основное ограничение заключается в том, что каждый процессор не может работать дольше времени, определенного пользовательским параметром execution_timeout. Если время выполнения превысит этот порог, будет возвращено значение по умолчанию (true или false, в зависимости от каждого процессора).

Я прошел через несколько вопросов и даже нашел похожее здесь ; Но в моем случае я не использую функцию map / foreach. Вместо этого я выбрал API процессора Кафки, так как мне нужно выполнить произвольный код внутри каждого процессора.

При поиске решений я наткнулся на две константы

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG (commit.interval.ms)
  • StreamsConfig.POLL_MS_CONFIG (poll.ms)

, но понял, что они не предназначены для установки верхней границы времени выполнения метода process.

Есть ли в Kafka Streams механизм ad-ho c для решения такой проблемы? Если возможно, я хотел бы знать, есть ли решение, которое не включает стандартные потоки Java (например, создание экземпляра объекта ThreadPool с Futures внутри него и перехватывание InterruptedException через некоторое время).

...