Возможно ли InvalidStateStoreException при получении хранилища состояний из ProcessorContext? - PullRequest
0 голосов
/ 18 января 2019

При попытке получить локальное хранилище состояний из KafkaStreams возможно получить исключение InvalidStateStoreException, если локальный экземпляр KafkaStreams еще не готов или хранилище состояний было только что перенесено в другой экземпляр (перебалансировка).

Скажем, у нас есть топология DSL, которая включает локальное хранилище состояний, добавленное addStateStore и некоторый процесс или преобразование (KStream:process или KStream::transform).

Ниже приведены мои вопросы:

  1. Может ли быть выдано исключение InvalidStateStoreException, когда локальное хранилище состояний получено из контекста ProcessorContext внутри Processor::init или Transformer::init, т.е.
KeyValueStore<ByteString, User> userStore =  
    (KeyValueStore<ByteString, User>) context.getStateStore("store_name"); 
  1. Мы сохраняем ссылку на KeyValueStore<ByteString, User> userStore и используем ее позже для изменения userStore внутри Punctuator::punctuate. Должны ли мы беспокоиться о получении InvalidStateStoreException исключения для любых операций put / get / delete с этим магазином?

1 Ответ

0 голосов
/ 18 января 2019

Может ли исключение InvalidStateStoreException генерироваться, когда локальное хранилище состояний получается из контекста ProcessorContext внутри Processor :: init или Transformer :: init, т.е.

Нет. init() не будет вызван, пока магазин не будет готов. Таким образом, InvalidStateStoreException никогда не произойдет.

Мы храним ссылку на KeyValueStore userStore и используем ее позже, чтобы изменить userStore внутри Punctuator :: punctuate. Стоит ли беспокоиться о получении исключения InvalidStateStoreException при любых операциях put / get / delete с этим хранилищем?

Нет. Вы можете смело читать / писать в магазине. InvalidStateStoreException никогда не произойдет. punctuate() выполняется тем же потоком, что и process(), и гарантируется, что хранилище готово при вызове punctuate().

...