По соображениям производительности API таймера не поддерживает операцию чтения (), которая в подавляющем большинстве случаев не является обязательной функцией.В небольшом наборе случаев использования, где это необходимо, например, когда вам нужно установить Timer в EventTime на основе наименьшей отметки времени, видимой в элементах внутри DoFn, мы можем использовать объект State для отслеживания значения.
Java (SDK 2.10.0)
// In this pattern, a Timer is set to fire based on the lowest timestamp seen in the DoFn.
public class SetEventTimeTimerBasedOnEarliestElementTime {
private static final Logger LOG = LoggerFactory
.getLogger(SetEventTimeTimerBasedOnEarliestElementTime.class);
public static void main(String[] args) {
// Create pipeline
PipelineOptions options = PipelineOptionsFactory.
fromArgs(args).withValidation().as(PipelineOptions.class);
// We will start our timer at a fixed point
Instant now = Instant.parse("2000-01-01T00:00:00Z");
// ----- Create some dummy data
// Create 3 elements, incrementing by 1 minute
TimestampedValue<KV<String, Integer>> time_1 = TimestampedValue.of(KV.of("Key_A", 1), now);
TimestampedValue<KV<String, Integer>> time_2 = TimestampedValue
.of(KV.of("Key_A", 2), now.plus(Duration.standardMinutes(1)));
TimestampedValue<KV<String, Integer>> time_3 = TimestampedValue
.of(KV.of("Key_A", 3), now.plus(Duration.standardMinutes(2)));
Pipeline p = Pipeline.create(options);
// Apply a fixed window of duration 10 min and Sum the results
p.apply(Create.timestamped(time_3, time_2, time_1)).apply(
Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(10))))
.apply(ParDo.of(new StatefulDoFnThatSetTimerBasedOnSmallestTimeStamp()));
p.run();
}
/**
* Set timer to the lowest value that we see in the stateful DoFn
*/
public static class StatefulDoFnThatSetTimerBasedOnSmallestTimeStamp
extends DoFn<KV<String, Integer>, KV<String, Integer>> {
// Due to performance considerations there is no read on a timer object.
// We make use of this Long value to keep track.
@StateId("currentTimerValue") private final StateSpec<ValueState<Long>> currentTimerValue =
StateSpecs.value(BigEndianLongCoder.of());
@TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement public void process(ProcessContext c,
@StateId("currentTimerValue") ValueState<Long> currentTimerValue,
@TimerId("timer") Timer timer) {
Instant timeStampWeWantToSet = c.timestamp();
//*********** Set Timer
// If the timer has never been set then we set it.
// If the timer has been set but is larger than our current value then we set it.
if (currentTimerValue.read() == null || timeStampWeWantToSet.getMillis() < currentTimerValue
.read()) {
timer.set(timeStampWeWantToSet);
currentTimerValue.write(timeStampWeWantToSet.getMillis());
}
}
@OnTimer("timer") public void onMinTimer(OnTimerContext otc,
@StateId("currentTimerValue") ValueState<Long> currentTimerValue,
@TimerId("timer") Timer timer) {
// Reset the currentTimerValue
currentTimerValue.clear();
LOG.info("Timer @ {} fired", otc.timestamp());
}
}
}