Kafka Streams - Использование существующего хранилища состояний после добавления нового исходного потока - PullRequest
1 голос
/ 29 марта 2020

У меня есть существующий поток, который использует две темы в качестве источника:

val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")

stream1
  .merge(stream2)
  .groupByKey
  .reduce(reduceValues)
  .toStream
  .to("result-topic")

Автоматически сгенерированное имя StateStore равно KSTREAM-REDUCE-STATE-STORE-0000000003.

Теперь мне нужно добавить еще один топи c в качестве источника. Однако добавление нового источника увеличивает внутренний номер kafka , в результате чего StateStore будет KSTREAM-REDUCE-STATE-STORE-0000000005. Я не хочу терять текущее состояние, поэтому я явно указываю имя старого StateStore:

val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic

stream1
  .merge(stream2)
  .merge(stream3) // merge new topic
  .groupByKey
  .reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
  .toStream
  .to("result-topic")

Кажется, это работает, но я не уверен, что я вмешиваюсь в внутренности Кафки, потому что:

  1. Я использую собственное имя в форме того, что Кафка будет генерировать автоматически (возможность конфликта имен?)
  2. Набор используемых потоков кормить это StateStore отличается от того, что было изначально.

Есть комментарии?

1 Ответ

3 голосов
/ 29 марта 2020

Честно говоря, самым безопасным вариантом было бы добавить понятное человеку имя к этому состоянию, но, как вы упомянули, вы потеряете его.

Я предполагаю, что не должно быть никаких проблем с что вы сделали (по крайней мере, пока не внесете еще одно изменение в код :)). Идентификатор 0000000003 будет назначен оператору groupByKey, поэтому никаких конфликтов не будет (хотя я не уверен на 100% в отношении внутренних компонентов Kafka Streams).

Также есть Приложение Инструмент сброса , который позволяет восстанавливать агрегаты. Но я не знаю, применимо ли это к вашему случаю: ваша политика хранения по темам ввода может помешать этому инструменту создавать точные агрегаты.

...