У меня есть проект Spring Boot, который запускает несколько потребителей Kafka (@KafkaListener) по темам Confluent Kakfa с 8 разделами.Параллелизм каждого потребителя установлен равным 1. Темы загружаются около миллиона строк сообщений из файла, и потребители используют их в пакетах для проверки, обработки и обновления базы данных.
На фабрике потребителей естьследующие настройки - max.poll.records = 10000, fetch.min.bytes = 100000, fetch.max.wait.ms = 1000, session.timeout.ms = 240000.
Обновление 06/04 Вот настройка Consumer Factory.Это Spring-Kafka-1.3.1. ВЫПУСК.Брокер Confluent Kafka имеет версию
@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(ListingMessage.class));
}
@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
ConsumerFactory<String, ListingMessage> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(listingConsumerFactory);
factory.setConcurrency(1);
factory.setAutoStartup(false);
factory.setBatchListener(true);
return factory;
}
Примечание: Для фабрики контейнеров для автоматического запуска установлено значение false.Это делается для того, чтобы вручную запускать / останавливать потребителя при загрузке большого файла.
Через примерно 1 час работы (время варьируется) потребители перестают потреблять сообщения из своей темы, даже если в теме доступно много сообщений.В методе потребления есть оператор log, который прекращает печать в журналах.
Я отслеживаю статус получателя с помощью команды "./kafka-consumer-groups" и вижу, что в этой группе нет потребителей посленекоторое время.
$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_name
Нет ошибок в журнале для этого потребителя сбой.Потребительский метод заключен в блок try-catch, поэтому он будет перехватывать любое исключение, которое было сгенерировано во время обработки сообщений.
Как мы можем спроектировать потребителя Spring-Kafka так, чтобы он перезапускал потребителя, если он останавливаетсяпотребляя?Есть ли слушатель, который может записать точную точку, когда потребитель останавливается?Это из-за установки параллелизма в 1?Причиной, по которой я должен был установить параллелизм на 1, было то, что были другие Потребители, которые замедлились, если этот Потребитель имел больше параллелизма.