Я использую пружинную кафку в своем приложении. Я хочу добавить некоторую задержку в 15 минут для использования записей для одного из слушателей - kafkaRetryListenerContainerFactory. У меня есть два слушателя. Ниже моя конфигурация:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaRetryListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(retryConsumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(true);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
Прослушиватель повторных попыток Кафки:
@KafkaListener(topics = "${spring.kafka.retry.topic}", groupId = "${spring.kafka.consumer-group-id}",
containerFactory = "kafkaRetryListenerContainerFactory", id = "retry.id")
public void retryMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
Thread.sleep(900000);
LOG.info(String.format("Consumed retry message -> %s", record.toString()));
acknowledgment.acknowledge();
}
Когда я добавил Thread.sleep (), я получаю постоянную ошибку перебалансировки в журналах
Attempt to heartbeat failed since group is rebalancing
Моя весенняя версия кафки - 2.3.4
Ниже приведены значения конфигурации:
max.poll.interval.ms = 1200000 (это выше, чем thread.sleep)
heartbeat.interval.ms = 3000
session.timeout.ms = 10000
Я попробовал ack.nack (900000); Все еще получаю ошибку перебалансировки
Любая помощь будет оценена