Смещение записей доступа в агрегаторе потоков Кафки - PullRequest
0 голосов
/ 30 апреля 2020

У меня есть простая оконная топология:

builder.stream("input-topic", Consumed.with(...))
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .transformValues(FrameTransformer::new)
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

Я хотел бы поместить фактическое смещение записи начала окна в агрегирующий объект Frame.

Как я могу получить доступ к смещению записи из функции windowAggregator (aggregate() handler)?

Я знаю, что могу получить доступ к смещению записи в FrameTransformer, но это не помогает мне создавать точные Frame объекты, описывающие мои windows в терминах начального и конечного смещения.

Я слышал, что есть способ сделать это, вставив еще один вызов .transform() перед groupByKey(), там я могу получить доступ к смещениям, но тогда мне нужно будет изменить схему моих записей событий, чтобы хранить там информацию о смещениях.

Есть ли (более простой) способ достижения моего намерения?

Обновление

Фактически мне удалось получить точные начальные и конечные смещения окон в Frame объектах следующим образом

builder.stream("input-topic", Consumed.with(...))
    .transformValues(EventTransformer::new)
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

Но, как уже упоминалось выше, за счет редактирования схемы * 10 28 * объект.

1 Ответ

2 голосов
/ 03 мая 2020

Как получить доступ к смещению записи из функции windowAggregator (aggregate () handler)?

Вы не можете. Ваш подход к использованию transformValues() до агрегирования (и для обогащения объекта Event является правильным подходом.

Было предложено расширить API, чтобы разрешить доступ к метаданным записей в aggregate() и других DSL. операторы, но он никогда не переносился через конечную линию sh (ср. https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams).

...