@OnTimer не срабатывает после окна - PullRequest
0 голосов
/ 03 июля 2019

Я играл с таймерами Apache Beam, но не могу их запустить.

Насколько я знаю, вы определяете таймер в DoFn следующим образом.

@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

Я выбрал TimeDomain.PROCESSING_TIME, поскольку моим событиям не назначена временная метка, и я хотел бы запустить таймер, как только закончится окно.

        .apply(
             "FixedWindow",
            Window.<KV<String, GenericRecord>>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )
        .apply("ExecuteAfterWindowFn", ParDo.of(new ExecuteAfterWindowFn()));

Я ожидал бы следующий таймер, который находится внутри DoFn, который в основном накапливает объекты в буфере, и после того, как окно сделано, продолжите с конвейером и обработайте набор событий ...

        @OnTimer("expiry")
    public void onExpiry(
        OnTimerContext context,
        @StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
        @StateId("buffered") BagState<GenericRecord> bufferedState) throws IOException {
        flush(context, bufferedState, bufferedSizeState);
    }

... для успешного выполнения. Я что-то упускаю или не понимаю, как работают таймеры в Apache Beam?

1 Ответ

0 голосов
/ 03 июля 2019

Вы можете проверить [1], где есть примеры использования таймера.

Вам необходимо установить время срабатывания таймеров [2], где может быть место, которое пропустило.

[1] https://beam.apache.org/blog/2017/08/28/timely-processing.html

[2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java#L53

...