Как использовать постоянный StateStore между двумя потоками Kafka - PullRequest
1 голос
/ 12 апреля 2019

У меня проблемы с попыткой добиться следующего через Kafka Streams:

  • При запуске приложения (сжатая) тема alpha загружается в ключ-значение StateStore map
  • Kafka Stream использует другую тему, использует (.get) карту выше и, наконец, создает новую запись в теме alpha
  • В результате карта в памяти должна соответствовать основной теме, даже если стример перезапускается.

Мой подход заключается в следующем:

val builder = new StreamsBuilderS()

val store = Stores.keyValueStoreBuilder(
   Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)

builder.addStateStore(store)

val loaderStreamer = new LoaderStreamer(store).startStream()

[...] // I wait a few seconds until the loading is complete and the stream os running

val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!

builder
  .stream("another-topic")(Consumed.`with`(kSerde, vSerde))
  .doMyAggregationsAndgetFromTheMapAbove
  .transform(() => new StoreTransformer[K, V]("store"), "store")
  .to("alpha")(Produced.`with`(kSerde, vSerde))

LoaderStreamer(store)

[...]
val builders = new StreamsBuilderS()

builder.addStateStore(store)

builder
  .table("alpha")(Consumed.`with`(kSerde, vSerde))

builder.build
[...]

StoreTransformer

[...]
override def init(context: ProcessorContext): Unit = {
  this.context = context
  this.store = 
    context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}

override def transform(key: K, value: V): (K, V) = {
  store.put(key, value)
  (key, value)
}
[...]

... но я получаю:

Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.

при попытке получить обработчик магазина.

Есть идеи, как этого добиться?

Спасибо!

1 Ответ

2 голосов
/ 12 апреля 2019

Нельзя совместно использовать хранилище состояний между двумя приложениями Kafka Streams.

Согласно документации: https://docs.confluent.io/current/streams/faq.html#interactive-queries может быть две причины вышеупомянутого исключения:

  • Локальный экземпляр KafkaStreams еще не готов, и поэтому его локальные хранилища состояний еще не могут быть запрошены.

  • Локальный экземпляр KafkaStreams готов, но конкретное хранилище состояний было только что перенесено веще один закулисный случай.

Самый простой способ справиться с этим - подождать, пока состояние хранилища не станет запрашиваемым:

public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                              final QueryableStoreType<T> queryableStoreType,
                                              final KafkaStreams streams) throws InterruptedException {
  while (true) {
    try {
      return streams.store(storeName, queryableStoreType);
    } catch (InvalidStateStoreException ignored) {
      // store not yet ready for querying
      Thread.sleep(100);
    }
  }
}

Весь пример можно найти по адресу сливной Github .

...