Я использую spring-kafka
для реализации потокового приложения с использованием Spring Boot 1.5.16. Используемая нами версия spring-kafka
- это 1.3.8.RELEASE.
Я ищу способ закрыть загрузочное приложение в случае ошибки, которая завершает все потоки, связанные с потоками Kafka. Я обнаружил, что внутри KafkaStreams
есть возможность зарегистрировать дескриптор для необработанных исключений. Метод setGlobalStateRestoreListener
.
Я видел, что этот метод выставлен внутри spring-kafka
в типе KStreamBuilderFactoryBean
.
У меня следующий вопрос. Есть ли простой способ зарегистрировать UncaughtExceptionHandler
в качестве bean-компонента и позволить Spring правильно внедрить его в фабричный bean-компонент? Или я должен создать KStreamBuilderFactoryBean
свой собственный и установить вручную обработчик?
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
streamsConfig);
streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
// Something happens here
});
return streamBuilderFactoryBean;
}
Большое спасибо.