Spring Boot KafkaListener перестает потреблять сообщения после некоторого запуска - PullRequest
0 голосов
/ 04 июня 2018

У меня есть проект Spring Boot, который запускает несколько потребителей Kafka (@KafkaListener) по темам Confluent Kakfa с 8 разделами.Параллелизм каждого потребителя установлен равным 1. Темы загружаются около миллиона строк сообщений из файла, и потребители используют их в пакетах для проверки, обработки и обновления базы данных.

На фабрике потребителей естьследующие настройки - max.poll.records = 10000, fetch.min.bytes = 100000, fetch.max.wait.ms = 1000, session.timeout.ms = 240000.

Обновление 06/04 Вот настройка Consumer Factory.Это Spring-Kafka-1.3.1. ВЫПУСК.Брокер Confluent Kafka имеет версию

@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
        new JsonDeserializer<>(ListingMessage.class));
}

@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
    ConsumerFactory<String, ListingMessage> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(listingConsumerFactory);
    factory.setConcurrency(1);
    factory.setAutoStartup(false);
    factory.setBatchListener(true);
    return factory;
}

Примечание: Для фабрики контейнеров для автоматического запуска установлено значение false.Это делается для того, чтобы вручную запускать / останавливать потребителя при загрузке большого файла.

Через примерно 1 час работы (время варьируется) потребители перестают потреблять сообщения из своей темы, даже если в теме доступно много сообщений.В методе потребления есть оператор log, который прекращает печать в журналах.

Я отслеживаю статус получателя с помощью команды "./kafka-consumer-groups" и вижу, что в этой группе нет потребителей посленекоторое время.

$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group  group_name

Нет ошибок в журнале для этого потребителя сбой.Потребительский метод заключен в блок try-catch, поэтому он будет перехватывать любое исключение, которое было сгенерировано во время обработки сообщений.

Как мы можем спроектировать потребителя Spring-Kafka так, чтобы он перезапускал потребителя, если он останавливаетсяпотребляя?Есть ли слушатель, который может записать точную точку, когда потребитель останавливается?Это из-за установки параллелизма в 1?Причиной, по которой я должен был установить параллелизм на 1, было то, что были другие Потребители, которые замедлились, если этот Потребитель имел больше параллелизма.

1 Ответ

0 голосов
/ 05 июня 2018

Я только что провел тест с 30 секундами max.poll.interval.ms=30000, приостановил слушателя, возобновил его через 30 секунд;и я вижу это в журнале ...

2018-06-04 18:35:59.361  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
foo

2018-06-04 18:37:07.347 ERROR 4191 --- [      foo-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1329) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

2018-06-04 18:37:07.350  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so50687794-0]
2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] (Re-)joining group
2018-06-04 18:37:10.400  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Successfully joined group with generation 15
2018-06-04 18:37:10.401  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Setting newly assigned partitions [so50687794-0]
2018-06-04 18:37:10.445  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
foo

Вы можете видеть, что после перебалансировки потребитель повторно добавляется, и то же сообщение повторно доставляется;чего я и ожидал.

Я получаю те же результаты;даже с 1.3.1.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...