Ошибка Kafka Streams - ошибка смещения фиксации на разделе, истекло время ожидания запроса - PullRequest
0 голосов
/ 28 июня 2018

Мы используем Kafka Streams для потребления, обработки и производства сообщений, а в PROD env мы столкнулись с ошибками по нескольким темам:

ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app] 
Offset commit failed on partition xxx-1 at offset 13920: 
The request timed out.[]

Эти ошибки встречаются редко для тем с небольшой нагрузкой, но для тем с высокой нагрузкой (и скачками) ошибки возникают десятки раз в день на тему. Темы имеют несколько разделов (например, 10). Кажется, эта проблема не влияет на обработку данных (несмотря на производительность), так как после выдачи исключения (даже может быть несколько ошибок для одного и того же смещения), потребитель позже перечитывает сообщение и успешно обрабатывает его.

Я вижу, что это сообщение об ошибке появилось в kafka-clients версии 1.0.0 из-за PR , но в предыдущих kafka-clients версиях для того же варианта использования (Errors.REQUEST_TIMED_OUT для потребителя) подобное сообщение ( Offset commit for group {} failed: {}) был зарегистрирован с уровнем debug. для меня было бы более логично обновить уровень журнала до предупреждения для такого варианта использования.

Как решить эту проблему? Что может быть основной причиной? Возможно, изменение пользовательских свойств или настройка раздела может помочь избавиться от такой проблемы.

мы используем следующую реализацию для создания потоков Kafka:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.<String, String>stream(topicName);
stream.foreach((key, value) -> processMessage(key, value));
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(consumerSettings);
new KafkaStreams(streamsTopology, streamsConfig);

наши потребительские настройки Kafka:

bootstrap.servers: xxx1:9092,xxx2:9092,...,xxx5:9092
application.id: app
state.dir: /tmp/kafka-streams/xxx
commit.interval.ms: 5000       # also I tried default value 30000
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor

версия брокера kafka: kafka_2.11-0.11.0.2. ошибка возникает в обеих версиях Kafka Streams: 1.0.1 и 1.1.0.

1 Ответ

0 голосов
/ 03 июля 2018

Похоже, у вас проблема с кластером Kafka, и у потребителя Kafka истекло время ожидания при попытке зафиксировать смещения. Вы можете попробовать увеличить конфиги, связанные с подключением, для потребителя Kafka

  1. request.timeout.ms (по умолчанию 305000ms)

Конфигурация контролирует максимальное количество времени, которое клиент будет дождаться ответа на запрос

  1. connections.max.idle.ms (по умолчанию 540000ms)

Закрытие незанятых соединений через количество миллисекунд, указанное этот конфиг.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...