Потребитель Kafka нуждается в новой конфигурации идентификатора приложения после перезапуска - PullRequest
0 голосов
/ 14 октября 2019

У меня есть приложение Java Kafka, которое использует тему. Иногда встречается ошибка INVALID_FETCH_SESSION_EPOCH. После перезапуска приложения оно не начинает потреблять снова. Когда я изменяю StreamsConfig.APPLICATION_ID_CONFIG, он начинает потреблять, но, очевидно, не с того же смещения. Какова причина того, что он не продолжает потреблять? Это как-то связано с ошибкой, которую я получаю, или причина в чем-то другом?

Ответы [ 2 ]

0 голосов
/ 14 октября 2019

InvalidFetchSessionEpochException возникает, когда период сеанса выборки запроса отличается от ожидаемого. Если в течение долгого времени вы выбираете вариант ответа, существует возможность, что поток опроса может отправить другой новый запрос с эпохой, тогда как ответ приходит с другой отметкой времени, и это сообщение не прекратит потреблять сообщение. Вы можете попытаться установить параметр для уменьшения размера буфера, попробуйте уменьшить размер сброса, чтобы уменьшить максимальный размер записи, чтобы убедиться, что он работает для вас. Вы также можете попробовать использовать обновленный Consumer. Не могли бы вы опубликовать настройку потребителя, которую вы используете?

Сведения о смещении потребителя управляются и сохраняются zookeeper на основе идентификатора группы / идентификатора приложения, поэтому, если вы используете тот же идентификатор потребителя, он должен продолжать потреблять отгде это осталось. Принимая во внимание, что если вы измените свой идентификатор потребителя, он начнется с самого раннего / самого последнего в зависимости от ваших настроекЕсли он не начал потреблять с тем же идентификатором после перезапуска, это может быть причиной того, что для восстановления баланса требуется много времени, или все еще ждет получения максимальной записи для начала потребления. Попробуйте уменьшить max.request.siz, max.poll.interval.ms и max.poll.records

0 голосов
/ 14 октября 2019

Иногда эта проблема вызвана несовместимостью версий между кластером и клиентом. Убедитесь, что вы используете правильную версию клиента, которая совместима с вашей версией кластера.

Вы можете обратиться к матрице совместимости здесь

...