Kafka Streams: Как использовать persistentKeyValueStore для перезагрузки существующих сообщений с диска? - PullRequest
1 голос
/ 29 мая 2019

Мой код в настоящее время использует InMemoryKeyValueStore, что позволяет избежать сохранения на диске или на kafka.Я хочу использоватьcksdb (Stores.persistentKeyValueStore), чтобы приложение перезагрузило состояние с диска.Я пытаюсь реализовать это, и я очень плохо знаком с Kafka и API потоков.Буду признателен за помощь в том, как я могу внести изменения, хотя я все еще пытаюсь понять вещи, как я иду.

Я пытался создать хранилище состояний здесь:

StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> store =
                Stores.<String, LinkedList<StoreItem>>keyValueStoreBuilder(Stores.persistentKeyValueStore(storeKey), Serdes.String(), valueSerde);

Как мне зарегистрировать его в построителе потоков?

Существующий код, который использует inMemoryKeyValueStore:

   static StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> makeStoreBuilder(
            final String storeKey,
            final Serde<LinkedList<StoreItem>> valueSerde,
            final boolean loggingDisabled) {

        final StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> storeBuilder =
                Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeKey), Serdes.String(), valueSerde);
        return storeBuilder;
    }

Мне нужно убедиться, что приложение потоков не будет пропускать существующие сообщения в теме журнала при каждом перезапуске.

Ответы [ 2 ]

0 голосов
/ 03 июня 2019

Вы используете постоянное хранилище точно так же, как хранилище в памяти.Магазин позаботится обо всем остальном, и вам не нужно беспокоиться о загрузке данных и т. Д. Вы просто используете его.

0 голосов
/ 02 июня 2019

Как мне зарегистрировать его в построителе потоков?

Вызвав StreamsBuilder#addStateStore().

https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addStateStore-org.apache.kafka.streams.state.StoreBuilder-

См. StateStoresInTheDSLIntegrationTestв https://github.com/confluentinc/kafka-streams-examples для сквозного демонстрационного приложения.

...