У меня есть простая оконная топология:
builder.stream("input-topic", Consumed.with(...))
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.transformValues(FrameTransformer::new)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
Я хотел бы поместить фактическое смещение записи начала окна в агрегирующий объект Frame.
Как я могу получить доступ к смещению записи из функции windowAggregator
(aggregate()
handler)?
Я знаю, что могу получить доступ к смещению записи в FrameTransformer
, но это не помогает мне создавать точные Frame
объекты, описывающие мои windows в терминах начального и конечного смещения.
Я слышал, что есть способ сделать это, вставив еще один вызов .transform()
перед groupByKey()
, там я могу получить доступ к смещениям, но тогда мне нужно будет изменить схему моих записей событий, чтобы хранить там информацию о смещениях.
Есть ли (более простой) способ достижения моего намерения?
Обновление
Фактически мне удалось получить точные начальные и конечные смещения окон в Frame
объектах следующим образом
builder.stream("input-topic", Consumed.with(...))
.transformValues(EventTransformer::new)
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
Но, как уже упоминалось выше, за счет редактирования схемы * 10 28 * объект.