Пользовательское хранилище состояний можно реализовать с помощью Processor API, как описано здесь:
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Ваше пользовательское хранилище состояний должно реализовывать StateStore.
- У вас должен быть интерфейс для представления операций, доступных в магазине.
- Вы должны предоставить реализацию StoreBuilder для создания экземпляров вашего магазина.
- Рекомендуется предоставить интерфейс, который ограничивает доступ к операциям только для чтения. Это не позволяет пользователям этого API изменять внешнее состояние запущенного вами приложения Kafka Streams.
Реализация будет выглядеть примерно так:
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
// implementation of the actual store
}
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
void write(K Key, V value);
}
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
V read(K key);
}
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>> {
// implementation of the supplier for MyCustomStore
}
для того, чтобы сделать его запрашиваемым;
- Обеспечить реализацию QueryableStoreType.
- Предоставляет класс-оболочку, который имеет доступ ко всем базовым экземплярам хранилища и используется для запросов.
Пример:
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore) {
return stateStore instanceOf MyCustomStore;
}
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
}
}