Вы можете сделать это, используя StreamsBuilderFactoryBeanCustomizer
, который дает вам доступ к базовому KafkaStreams
объекту. Если вы используете связующие версии 3.0 или выше, это рекомендуемый подход. Например, вы можете указать следующее bean
в своем приложении и настроить его с помощью GlobalStateRestoreListener
.
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setGlobalStateRestoreListener(...);
}
});
};
}
Этот блог содержит более подробную информацию об этой стратегии.