Задержка фиксированного окна от запуска на несколько минут - PullRequest
0 голосов
/ 04 июля 2019

Использование фиксированной Windows в Apache Beam.Водяной знак устанавливается по времени события.

Некоторые данные могут прийти не в порядке и привести к закрытию окна.

Как можно определить триггер в Java, например, через 2 минуты после просмотра последних данных?

Ответы [ 2 ]

0 голосов
/ 12 июля 2019

Я отказался от использования Beam и внедрил решение в Kafka Streams.

Я в основном сгруппировал, затем использовал фиксированные окна и агрегировал результат. «Благодать» в окне позволяет данным поступать поздно.

KGroupedStream<Long, OxyStreamItem> grouped = input.groupByKey();

TimeWindowedKStream<Long, OxyStreamItem> windowed = 
  grouped.windowedBy(
    TimeWindows.of(WIN_SIZE)
            .advanceBy(WIN_SIZE)
            .grace(Duration.ofSeconds(5L)));

return windowed
         .aggregate(
            makeInitializer(),
            makeAggregator(),
            Materialized
              .<Long, Aggregate, WindowStore<Bytes, byte[]>>as("tmp")
              .withValueSerde(new AggregateSerde()))
         .suppress(
            Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
         .toStream()
         .map(calculateAvg());
0 голосов
/ 09 июля 2019

Не совсем ясно, какое поведение вы ожидаете.Один вопрос: что вы ожидаете, если данные поступят в течение двух минут?Хотите перезапустить двухминутный интервал, не перезапускать его, переиздавать данные или нет?

Похоже, что триггер, который вы пытаетесь описать, выглядит примерно так:

  • ожидание, пока водяной знак не достигнет конца окна, во время события;
  • ожидание дополнительных 2 минут во время обработки;
  • отправка данных;

Если на шаге 2 это было время события, т.е. вы хотели переиздать окно, если прибывает поздний элемент, который вписывается в window + 2min, то вы можете использовать withAllowedLateness().Хотя он звучит не так, как вы хотите, потому что он может продолжать переиздавать содержимое окна каждый раз, когда приходит соответствующий поздний элемент.

Со временем обработки на шаге 2 это вообще невозможно в случае базовых триггеров, которыедоступны в Beam.Вы, вероятно, можете добиться желаемого поведения, если вы вручную управляете состоянием и таймерами в своем собственном ParDo, например, вы можете наблюдать за входящими элементами, отслеживать их в состоянии, а затем по таймеруто, что ты хочешь.Это может стать очень сложным и, возможно, все еще не достаточно гибким для вашего конкретного случая использования.

Одна из основных проблем заключается в том, что в Beam вообще нет хорошего способа определения триггеров времени обработки.Было бы сложно определить общий механизм работы с таймерами таким образом.Например, когда вы хотите выразить "wait for 2 minutes", фреймворк должен понимать, что означают эти две минуты, когда запускать таймер, поэтому вам также нужен механизм для выражения этого.А с композицией, продолжением и другими сложностями это не так просто рассуждать.Так что это не в рамках в этом общем виде.

Чтобы реализовать только "wait for 2 minutes after the last element was seen in the window", структура должна следить за ним и устанавливать таймер.Технически возможно сделать что-то подобное, но, похоже, никто этого еще не сделал.

Кажется, в Beam имеется только один значимый триггер времени обработки, но он не является общимдостаточно и не делает то, что вы хотите.Вы можете посмотреть составные триггеры, такие как AfterFirst или AfterAll, но они, вероятно, не помогут вам без лучшего общего триггера времени обработки.

...