Я использую spring-kafka 3.1.6 и hazelcast 3.7.4.
При закрытии моего приложения кажется, что hazelcast выключается перед моим потребителем kafka.
Таким образомчто приводит к ошибкам
nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!
и
org.apache.kafka.common.errors.WakeupException: null
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:422)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
Мой потребитель настроен с помощью RetryTemplate.
org.springframework.context.support.DefaultLifecycleProcessor Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]
Может ли кто-нибудь помочь мне в постепенном отключении с помощью ожидания фундуказа остановку контейнера kafka?
Спасибо
Редактировать
И KafkaContainer, и HazelcastInstance обрабатываются бобами весной.
HazelcastInstanceсоздается вручную (без использования автоматической конфигурации Springboot по умолчанию):
@PreDestroy
public void destroy() {
LOGGER.info("Closing Hazelcast");
Hazelcast.shutdownAll();
}
@Bean
public HazelcastInstance hazelcastInstance() {
LOGGER.debug("Configuring Hazelcast");
Config config = new Config();
...
и контейнер Kafka:
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Key, Value> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Key, Value> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
factory.getContainerProperties().setErrorHandler(new DefaultErrorHandler());
return factory;
}
Редактировать 2:
I 'м запущенное приложение реплицируется 3 раза.Все 3 реплики были закрыты в одно и то же время.
Вот история журналов выключения для 1 реплики:
1) 18: 14: 54.427 Событие весны «ContextClosedEvent» было перехваченои регистрируется
2) 18: 14: 54.441 org.apache.kafka.common.errors.WakeupException: null
3) 18: 14: 55.951 com.hazelcast.core.HazelcastInstanceNotActiveException:Экземпляр Hazelcast не активен!
4) Повторите 3) 19 раз с 18: 14: 55.951 до 18: 15: 23.103
5) 18: 15: 24.441 Не удалось закрыть 1 компонентсо значением фазы 2147483547 в течение времени ожидания 30000: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]
6) 18: 15: 24.537 Закрытие Hazelcast