Не удается перезапустить приложение Kafka Consumer, сбой из-за исключения OffsetOutOfRangeException - PullRequest
0 голосов
/ 25 февраля 2019

В настоящее время мое потоковое приложение Kafka Consumer вручную фиксирует смещения в Kafka с enable.auto.commit, установленным на false.Приложение не удалось, когда я попытался перезапустить его, выдав исключение ниже:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555}

Предполагая, что вышеупомянутая ошибка связана с сообщением, отсутствующим / раздел удален из-за периода хранения, я попробовал метод ниже:

Я отключил ручную фиксацию и включил автоматическую фиксацию (enable.auto.commit=true и auto.offset.reset=earliest). Тем не менее, происходит сбой с той же ошибкой

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555}

Пожалуйста, предложите способы перезапуска задания, чтобы оно могло успешно прочитать правильные данные.смещение, для которого присутствует сообщение / раздел

Ответы [ 2 ]

0 голосов
/ 23 июля 2019

Я столкнулся с той же проблемой, и я использую пакет org.apache.spark.streaming.kafka010 в своем приложении. Вначале я подозревал, что стратегия auto.offset.reset не работает, но когда я читаю описаниеМетод fixKafkaParams в объекте KafkaUtils, я обнаружил, что конфигурация была перезаписана. Я предполагаю, что причина, по которой он настраивает ConsumerConfig.AUTO_OFFSET_RESET_CONFIG для исполнителя, состоит в том, чтобы сохранить согласованное смещение, полученное драйвером и исполнителем.

0 голосов
/ 26 февраля 2019

Вы пытаетесь прочитать смещение 155555555 из раздела 12 темы partition, но, скорее всего, оно уже было удалено из-за вашей политики хранения.

Вы можете использовать Инструмент сброса приложения Kafka Streams , чтобы сбросить внутреннее состояние приложения Kafka Streams, чтобы оно могло обработать свои входные данные с нуля

$ bin/kafka-streams-application-reset.sh

Option (* = required)         Description
---------------------         -----------
* --application-id <id>       The Kafka Streams application ID (application.id)
--bootstrap-servers <urls>    Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2
                                (default: localhost:9092)
--intermediate-topics <list>  Comma-separated list of intermediate user topics
--input-topics <list>         Comma-separated list of user input topics
--zookeeper <url>             Format: HOST:POST
                                (default: localhost:2181)

или запустите своего потребителя, используя новый идентификатор группы потребителей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...