У меня есть существующий поток, который использует две темы в качестве источника:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
stream1
.merge(stream2)
.groupByKey
.reduce(reduceValues)
.toStream
.to("result-topic")
Автоматически сгенерированное имя StateStore
равно KSTREAM-REDUCE-STATE-STORE-0000000003
.
Теперь мне нужно добавить еще один топи c в качестве источника. Однако добавление нового источника увеличивает внутренний номер kafka , в результате чего StateStore
будет KSTREAM-REDUCE-STATE-STORE-0000000005
. Я не хочу терять текущее состояние, поэтому я явно указываю имя старого StateStore
:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic
stream1
.merge(stream2)
.merge(stream3) // merge new topic
.groupByKey
.reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
.toStream
.to("result-topic")
Кажется, это работает, но я не уверен, что я вмешиваюсь в внутренности Кафки, потому что:
- Я использую собственное имя в форме того, что Кафка будет генерировать автоматически (возможность конфликта имен?)
- Набор используемых потоков кормить это
StateStore
отличается от того, что было изначально.
Есть комментарии?