Изящное отключение потребителя кафки - PullRequest
0 голосов
/ 05 октября 2018

Я использую spring-kafka 3.1.6 и hazelcast 3.7.4.

При закрытии моего приложения кажется, что hazelcast выключается перед моим потребителем kafka.

Таким образомчто приводит к ошибкам

nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!

и

org.apache.kafka.common.errors.WakeupException: null
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:422)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)

Мой потребитель настроен с помощью RetryTemplate.

org.springframework.context.support.DefaultLifecycleProcessor   Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]

Может ли кто-нибудь помочь мне в постепенном отключении с помощью ожидания фундуказа остановку контейнера kafka?

Спасибо

Редактировать

И KafkaContainer, и HazelcastInstance обрабатываются бобами весной.

HazelcastInstanceсоздается вручную (без использования автоматической конфигурации Springboot по умолчанию):

@PreDestroy
public void destroy() {
    LOGGER.info("Closing Hazelcast");
    Hazelcast.shutdownAll();
}

@Bean
public HazelcastInstance hazelcastInstance() {
    LOGGER.debug("Configuring Hazelcast");
    Config config = new Config();
    ...

и контейнер Kafka:

@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Key, Value> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Key, Value> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    factory.getContainerProperties().setErrorHandler(new DefaultErrorHandler());
    return factory;
}

Редактировать 2:

I 'м запущенное приложение реплицируется 3 раза.Все 3 реплики были закрыты в одно и то же время.

Вот история журналов выключения для 1 реплики:

1) 18: 14: 54.427 Событие весны «ContextClosedEvent» было перехваченои регистрируется

2) 18: 14: 54.441 org.apache.kafka.common.errors.WakeupException: null

3) 18: 14: 55.951 com.hazelcast.core.HazelcastInstanceNotActiveException:Экземпляр Hazelcast не активен!

4) Повторите 3) 19 раз с 18: 14: 55.951 до 18: 15: 23.103

5) 18: 15: 24.441 Не удалось закрыть 1 компонентсо значением фазы 2147483547 в течение времени ожидания 30000: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]

6) 18: 15: 24.537 Закрытие Hazelcast

1 Ответ

0 голосов
/ 05 октября 2018

Если потребительский бин Kafka не зависит напрямую от бина Hazelcast, вы можете определить порядок между бинами (как во время инициализации, так и во время уничтожения), используя атрибут depends-on определений бина.Как;

<bean id="kafka-consumer" class="KafkaConsumerClassName" depends-on="hazelcast"/>
<bean id="hazelcast" class="HazelcastInstance" />

В этом случае bean-компонент Hazelcast будет создан и инициализирован до потребительского EJB Kafka и будет уничтожен после EJB Kafka.

См. Использование зависимого раздела в справочном руководстве Spring.

...