Я играл с таймерами Apache Beam, но не могу их запустить.
Насколько я знаю, вы определяете таймер в DoFn следующим образом.
@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
Я выбрал TimeDomain.PROCESSING_TIME
, поскольку моим событиям не назначена временная метка, и я хотел бы запустить таймер, как только закончится окно.
.apply(
"FixedWindow",
Window.<KV<String, GenericRecord>>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes()
)
.apply("ExecuteAfterWindowFn", ParDo.of(new ExecuteAfterWindowFn()));
Я ожидал бы следующий таймер, который находится внутри DoFn, который в основном накапливает объекты в буфере, и после того, как окно сделано, продолжите с конвейером и обработайте набор событий ...
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
@StateId("buffered") BagState<GenericRecord> bufferedState) throws IOException {
flush(context, bufferedState, bufferedSizeState);
}
... для успешного выполнения. Я что-то упускаю или не понимаю, как работают таймеры в Apache Beam?