Обновлено : Я неправильно понял вопрос OP: «Как проверить, закончилась ли топология, материализовал ввод topi c в хранилище состояний» в «Процесс восстановления хранилища состояний»
Вы может получить KeyValueStore из вашего экземпляра KafkaStreams только в том случае, если состояние KafkaStreams изменилось с состояния REBALANCING
на RUNNING
. Вы можете проверить этот переход состояния, используя StreamsBuilderFactoryBeanCustomizer
для доступа к базовому экземпляру KafkaStreams. Если вы просто хотите проверить, когда все хранилища состояний были заполнены полностью и когда поток потока kafka готов, чтобы вы могли получить KeyValueStore
, вы можете прослушивать StateListener
:
@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
// set flag that `stateStore` store of current KafkaStreams has been fully restore
// then you can get
}
}
}
или если вы хотите получить хранилище из KafkaStreams
instance
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
//get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
//and set flag that `stateStore` store of current KafkaStreams has been fully restore
}
});
});
}
Подробнее в документации .
Обратите внимание, что должен быть только один экземпляр StreamsBuilderFactoryBeanCustomizer.