У меня довольно медленный потребитель, который может занять более 5 минут для обработки записи. Чего я хочу избежать, так это как Кафка стабилизирует группу.
Чтобы сделать это из моего понимания, я должен установить для брокера kafka следующие свойства:
group.max.session.timeout.ms = 3600001
group.min.session.timeout.ms = 3600000
В моей стороне приложения у меня есть следующая конфигурация:
@Bean
public Map<String, Object> consumerConfigs() {
final Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
environment.getProperty("app.kafkaBrokers"));
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) );
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) + 1 );
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return propsMap;
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(9);// was 3
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
А также в моем слушателе:
@KafkaListener(id = "baz", topics = "tipJobsForExecution", containerFactory="kafkaListenerContainerFactory")
public void listen(ConsumerRecord<?, ?> record)
Моему слушателю требуется около 5 минут для обработки сообщений. Как только он закончился, я прочитал следующее на стороне брокера kafka:
2018-05-03 10: 29: 11,210] ИНФОРМАЦИЯ [GroupCoordinator 0]: подготовка к
перебалансировать группу баз со старым поколением 22 (__consumer_offsets-7)
(Kafka.coordinator.group.GroupCoordinator)
Насколько я понимаю, кафка считает потребителя мертвым и восстанавливает равновесие в группе. Мой вопрос: почему это происходит?
У меня есть одна идея, что, возможно, сердцебиение не сердцебиение каждые 3000 мс, как должно, но я не знаю, насколько это неприятно.
Заранее спасибо,
Яннис