При попытке получить локальное хранилище состояний из KafkaStreams
возможно получить исключение InvalidStateStoreException
, если локальный экземпляр KafkaStreams
еще не готов или хранилище состояний было только что перенесено в другой экземпляр (перебалансировка).
Скажем, у нас есть топология DSL, которая включает локальное хранилище состояний, добавленное addStateStore
и некоторый процесс или преобразование (KStream:process
или KStream::transform
).
Ниже приведены мои вопросы:
- Может ли быть выдано исключение
InvalidStateStoreException
, когда локальное хранилище состояний получено из контекста ProcessorContext
внутри Processor::init
или Transformer::init
, т.е.
KeyValueStore<ByteString, User> userStore =
(KeyValueStore<ByteString, User>) context.getStateStore("store_name");
- Мы сохраняем ссылку на
KeyValueStore<ByteString, User> userStore
и используем ее позже для изменения userStore
внутри Punctuator::punctuate
. Должны ли мы беспокоиться о получении InvalidStateStoreException
исключения для любых операций put / get / delete с этим магазином?