Как сделать так, чтобы зависание происходило в приложении на основе потока kafka - PullRequest
1 голос
/ 22 мая 2019

У меня есть настоящий env с кластером машин 3 кафка, который получает много данных.Для каждой темы существует 25 разделов с коэффициентом репликации, установленным на 2.

Мое приложение, которое (приложение на основе потока kafka) получает данные из этого кластера kafka, было закрыто более месяца.Теперь в каждом разделе огромная куча лагов;на тон 90000000.

Мне известны следующие параметры:

max.poll.records ; default —> 500
max.partition.fetch.bytes ; default —> 1048576
fetch.max.bytes ; default —> 52428800
fetch.min.bytes ; default —> 1

max.poll.interval.ms ; default —> 300000
request.timeout.ms; default —> 30000
session.timeout.ms ; default —> 10000

У меня есть 2 потребительских узла (один и тот же идентификатор группы, который потребляет данные из кластера kafka).

Тем не менее, он не догоняет отставание, он остается тем же самым.Кто-нибудь может подсказать, как можно улучшить отставание лага?

1 Ответ

0 голосов
/ 22 мая 2019

Если ваше приложение не работало в течение месяца, срок действия некоторых записей истек, поскольку срок хранения по умолчанию в теме составляет 7 дней, поэтому, по всей вероятности, вы потеряли несколько сообщений.Кроме того, сброс смещения по умолчанию сохраняется 1 или 7 дней в зависимости от версии Kafka Streams.Кажется, у вас есть auto.offset.reset: earliest, поэтому он начинает потреблять сообщения с начала для каждого раздела.Если вам нужно пропустить все сообщения и использовать только новые сообщения, вы должны установить auto.offset.reset: latest и изменить значение application.id на новое.

Если вы хотите параллельно использовать сообщения и ускорить уменьшение задержки, вы можете либо установить config num.stream.threads на какое-то значение, например 12 (num.stream.threads * numberOfConsumerNodes должно быть меньше или равно numberOfPartitions, в противном случае некоторые потоки будут простаивать), либо увеличить число узлов-потребителей.

...