У меня есть топология, где у меня есть поток A
.
Из этого потока A
я создаю WindowedStore S
.
A --> [S]
Затем я хочу преобразовать объекты в A в зависимости от данных на S
, а также эти преобразованные объекты, чтобы они поступили в логику WindowStore (через transformValues
).
Для этого я создаю для этого Трансформатор, создаю Поток A'
и информирую о нем окошко (то есть теперь S
будет сделано из A'
, а не из A
).
A -> A' --> [S]
^__read__|
Но я не могу этого сделать, потому что, когда я создаю Топологию, выдается исключение:
Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.
Есть ли способ обойти это? Это ограничение?
Пример кода:
// A
val sessionElementsStream: KStream[K, SessionElement] = ...
// A'
val sessionElementsTransformed : KStream[K, SessionElementTransformed] = {
// Here we use the sessionStoreName - but it is not added yet to the Topology
sessionElementsStream.
transformValues(sessionElementTransformerSupplier, sessionStoreName)
}
val sessionElementsWindowedStream: SessionWindowedKStream[K, SessionElementTransformed] = {
sessionElementsTransformed.
groupByKey(sessionElementTransformedGroupedBy).
windowedBy(sessionWindows)
}
val sessionStore : KTable[Windowed[K], List[WindowedSession]] =
sessionElementsWindowedStream.aggregate(
initializer = List.empty[WindowedSession])(
aggregator = anAggregator, merger = aMerger)(materialized = getMaterializedMUPKSessionStore(sessionStoreName))
Первоначальная проблема заключается в том, что в зависимости от значений предыдущих сессий, я хотел бы изменить сессии после нее. Но если я сделаю это в преобразователе после сеанса, эти преобразованные сеансы могут быть изменены и отправлены в нисходящем направлении - но они не будут отражать свое новое состояние в S
- поэтому дальнейшие запросы к хранилищу будут иметь старые значения.
Kafka Streams 2.1, Scala 2.12.4.
Разделенные темы.
UPDATE
Есть способ сделать это в DSL, используя дополнительную тему:
- Отправлено A '
to
этой теме
- Создайте
builder.stream
из этой темы и создайте из нее хранилище.
- Определите хранилище перед тем, как определить преобразование (чтобы на этапе преобразования можно было использовать хранилище, поскольку оно уже определено ранее).
Тем не менее, звучит громоздко, чтобы использовать здесь дополнительную тему. Нет ли другого, более простого способа ее решить?