Может ли клиент kafka streams программно определить свое потребительское отставание - PullRequest
1 голос
/ 09 марта 2019

Я работаю над сервисом, который использует kafka streams api.Мне интересно, есть ли способ определить, насколько далеко отстает мой сервис в потреблении записей.Я хочу иметь возможность запросить отставание потребителя.

Вот некоторые сведения о том, чего я пытаюсь достичь.Мой сервис использует API-потоки, он прослушивает входную тему, выполняет некоторую обработку, связанную с состоянием, и выводит запись по выходной теме.

Я хочу позаботиться о сценарии, когда происходит сбой моей службы, а затем происходитвернуться через несколько часов.В течение этого времени будет накоплено огромное количество записей по теме ввода.

Как только он вернется в оперативный режим, служба начнет использовать все накопленные записи из входной темы, а также выведет много записей по выходной теме.

Я хочу иметь возможность обнаруживатьтот факт, что мой сервис имеет огромное потребительское отставание и задерживает его производство, если это так.То есть я хочу, чтобы моя служба использовала все накопленные входные записи до тех пор, пока она не приблизится к почти реальному времени, и только тогда она должна начать выводить сообщения.

Наилучший способ, который я нашел на данный момент, это подключить метод ConsumerInterceptor.

ConsumerInterceptor.onConsume() при каждом чтении записей:

    ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> records)

ИзConsumerRecords, тогда я могу получить временную метку записей.Если временная метка слишком сильно отстает от текущего времени, я бы остановил вывод сообщений.

Вместо того, чтобы основываться на временных метках в записях, было бы лучше, если бы я мог как-то запросить задержку потребителя.

Может быть, я не могу запросить отставание потребителя, потому что это идет вразрез с принципом, для которого предназначен kafka.Если у кого-то есть какие-либо предложения или как мне следует подойти к своей проблеме в целом, пожалуйста, дайте мне знать.

В качестве примечания, мой сервис не использует высокоуровневый API-интерфейс DSL для потоков kafka, но используетAPI процессора нижнего уровня.

Спасибо за потраченное время.

...