У меня проблемы с попыткой добиться следующего через Kafka Streams:
- При запуске приложения (сжатая) тема
alpha
загружается в ключ-значение StateStore
map
- Kafka Stream использует другую тему, использует (.get) карту выше и, наконец, создает новую запись в теме
alpha
- В результате карта в памяти должна соответствовать основной теме, даже если стример перезапускается.
Мой подход заключается в следующем:
val builder = new StreamsBuilderS()
val store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)
builder.addStateStore(store)
val loaderStreamer = new LoaderStreamer(store).startStream()
[...] // I wait a few seconds until the loading is complete and the stream os running
val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!
builder
.stream("another-topic")(Consumed.`with`(kSerde, vSerde))
.doMyAggregationsAndgetFromTheMapAbove
.transform(() => new StoreTransformer[K, V]("store"), "store")
.to("alpha")(Produced.`with`(kSerde, vSerde))
LoaderStreamer(store)
[...]
val builders = new StreamsBuilderS()
builder.addStateStore(store)
builder
.table("alpha")(Consumed.`with`(kSerde, vSerde))
builder.build
[...]
StoreTransformer
[...]
override def init(context: ProcessorContext): Unit = {
this.context = context
this.store =
context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}
override def transform(key: K, value: V): (K, V) = {
store.put(key, value)
(key, value)
}
[...]
... но я получаю:
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.
при попытке получить обработчик магазина.
Есть идеи, как этого добиться?
Спасибо!