Ошибка постоянного хранилища Kafka Streams: хранилище состояний, возможно, перенесено в другой экземпляр - PullRequest
0 голосов
/ 25 апреля 2018

Я использую Kafka Streams с Spring Boot.В моем случае использования, когда я получаю событие клиента от другого микросервиса, мне нужно хранить в материализованном представлении customer , а когда я получаю событие order , мне нужно присоединиться к клиенту и заказать, а затем сохранить в заказ клиента материализованный вид.Чтобы добиться этого, я создал постоянное хранилище значений ключей customer-store и , обновляя его при появлении нового события .

    StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer"),Serdes.String(), customerSerde).withLoggingEnabled(new HashMap<>());
streamsBuilder.addStateStore(customerStateStore);
KTable<String,Customer> customerKTable=streamsBuilder.table("customer",Consumed.with(Serdes.String(),customerSerde));
    customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: "+value)));

Сконфигурированная топология, Streams и объект запущенных потоков.Когда я пытаюсь получить доступ к хранилищу с помощью ReadOnlyKeyValueStore, я получаю следующее исключение, хотя несколько минут назад я хранил некоторые объекты

streams.start();
ReadOnlyKeyValueStore<String, Customer> customerStore = streams.store("customer", QueryableStoreTypes.keyValueStore());
System.out.println("customerStore.approximateNumEntries()-> " + customerStore.approximateNumEntries());

Код, загруженный в Github для справки.Благодарим Вас за помощь.

Исключение:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, customer, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
    at com.kafkastream.service.EventsListener.main(EventsListener.java:94)

1 Ответ

0 голосов
/ 25 апреля 2018

Государственному магазину обычно требуется некоторое время для подготовки.Самый простой подход, как показано ниже.(код из официального документа)

public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                              final QueryableStoreType<T> queryableStoreType,
                                              final KafkaStreams streams) throws InterruptedException {
  while (true) {
    try {
      return streams.store(storeName, queryableStoreType);
    } catch (InvalidStateStoreException ignored) {
      // store not yet ready for querying
      Thread.sleep(100);
    }
  }
}

Дополнительную информацию можно найти в документе.https://docs.confluent.io/current/streams/faq.html#interactive-queries

...