Как настроить UncaughtExceptionHandler в Spring Kafka Stream - PullRequest
1 голос
/ 17 января 2020

Я использую spring-kafka для реализации потокового приложения с использованием Spring Boot 1.5.16. Используемая нами версия spring-kafka - это 1.3.8.RELEASE.

Я ищу способ закрыть загрузочное приложение в случае ошибки, которая завершает все потоки, связанные с потоками Kafka. Я обнаружил, что внутри KafkaStreams есть возможность зарегистрировать дескриптор для необработанных исключений. Метод setGlobalStateRestoreListener.

Я видел, что этот метод выставлен внутри spring-kafka в типе KStreamBuilderFactoryBean.

У меня следующий вопрос. Есть ли простой способ зарегистрировать UncaughtExceptionHandler в качестве bean-компонента и позволить Spring правильно внедрить его в фабричный bean-компонент? Или я должен создать KStreamBuilderFactoryBean свой собственный и установить вручную обработчик?

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
    final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
            streamsConfig);
    streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
        // Something happens here
    });
    return streamBuilderFactoryBean;
}

Большое спасибо.

1 Ответ

2 голосов
/ 17 января 2020

Да. В этой старой версии вы должны сами указать bean-компонент KStreamBuilderFactoryBean с соответствующими инъекциями и точно такой KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME.

. В более поздних версиях у нас уже есть StreamsBuilderFactoryBeanCustomizer, чтобы все еще сохранять эту автоматически сконфигурированную KStreamBuilderFactoryBean, но вы можете изменить его так, как нам нужно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...