Государственный магазин Kafka Stream Custom - PullRequest
0 голосов
/ 11 ноября 2018

Я готовил документ о государственном хранилище, но мне все еще не ясно, может ли он соответствовать моей цели. Я хотел бы использовать базу данных распределенных графов в качестве хранилища состояний, из которого может потребляться другое внешнее приложение. Возможно ли это, какие усилия это требует, и может ли кто-нибудь указать мне на класс / код, который необходимо будет расширить для того, чтобы эта функциональность появилась.

1 Ответ

0 голосов
/ 11 ноября 2018

Пользовательское хранилище состояний можно реализовать с помощью Processor API, как описано здесь:
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores

  • Ваше пользовательское хранилище состояний должно реализовывать StateStore.
  • У вас должен быть интерфейс для представления операций, доступных в магазине.
  • Вы должны предоставить реализацию StoreBuilder для создания экземпляров вашего магазина.
  • Рекомендуется предоставить интерфейс, который ограничивает доступ к операциям только для чтения. Это не позволяет пользователям этого API изменять внешнее состояние запущенного вами приложения Kafka Streams.

Реализация будет выглядеть примерно так:

public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
  // implementation of the actual store
}

// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
  void write(K Key, V value);
}

// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
  V read(K key);
}

public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>> {
  // implementation of the supplier for MyCustomStore
}

для того, чтобы сделать его запрашиваемым;

  • Обеспечить реализацию QueryableStoreType.
  • Предоставляет класс-оболочку, который имеет доступ ко всем базовым экземплярам хранилища и используется для запросов.

Пример:

public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {

  // Only accept StateStores that are of type MyCustomStore
  public boolean accepts(final StateStore stateStore) {
    return stateStore instanceOf MyCustomStore;
  }

  public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
      return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
  }

}
...