Мы используем Kafka Streams для потребления, обработки и производства сообщений, а в PROD env мы столкнулись с ошибками по нескольким темам:
ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app]
Offset commit failed on partition xxx-1 at offset 13920:
The request timed out.[]
Эти ошибки встречаются редко для тем с небольшой нагрузкой, но для тем с высокой нагрузкой (и скачками) ошибки возникают десятки раз в день на тему. Темы имеют несколько разделов (например, 10). Кажется, эта проблема не влияет на обработку данных (несмотря на производительность), так как после выдачи исключения (даже может быть несколько ошибок для одного и того же смещения), потребитель позже перечитывает сообщение и успешно обрабатывает его.
Я вижу, что это сообщение об ошибке появилось в kafka-clients
версии 1.0.0
из-за PR , но в предыдущих kafka-clients
версиях для того же варианта использования (Errors.REQUEST_TIMED_OUT
для потребителя) подобное сообщение ( Offset commit for group {} failed: {}
) был зарегистрирован с уровнем debug
.
для меня было бы более логично обновить уровень журнала до предупреждения для такого варианта использования.
Как решить эту проблему? Что может быть основной причиной? Возможно, изменение пользовательских свойств или настройка раздела может помочь избавиться от такой проблемы.
мы используем следующую реализацию для создания потоков Kafka:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.<String, String>stream(topicName);
stream.foreach((key, value) -> processMessage(key, value));
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(consumerSettings);
new KafkaStreams(streamsTopology, streamsConfig);
наши потребительские настройки Kafka:
bootstrap.servers: xxx1:9092,xxx2:9092,...,xxx5:9092
application.id: app
state.dir: /tmp/kafka-streams/xxx
commit.interval.ms: 5000 # also I tried default value 30000
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
версия брокера kafka: kafka_2.11-0.11.0.2
.
ошибка возникает в обеих версиях Kafka Streams: 1.0.1
и 1.1.0
.