Обычное окно обработки длительностью в одну секунду предоставит вам окно, содержащее все события, которые происходят в течение одной секунды, за любую секунду, в которой есть хотя бы одно событие. Но это окно не будет выровнено по первому событию; он будет выровнен по времени суток. Так, например, если первое событие в окне происходит на полпути в течение данной секунды, то это окно будет включать в себя только события для 500 мсек c, следующих за первым.
A ProcessingTimeTrigger
срабатывает один раз в конце окна. A CountinousProcessingTimeTrigger
запускается несколько раз с определенной скоростью.
Чтобы получить именно ту семантику, которую вы описали, вам понадобится пользовательский триггер. Вы можете сделать что-то похожее на этот OneSecondIntervalTrigger пример , за исключением того, что вы захотите переключиться с использования времени события на время обработки и запускать только один раз, а не повторно.
Это приведет к Вы с чем-то вроде этого:
public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {
@Override
public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// firstSeen will be false if not set yet
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
// register initial timer only for first element
if (firstSeen.value() == null) {
// FIRE the window 1000 msec after the first event
long now = ctx.getCurrentProcessingTime();
ctx.registerProcessingTimeTimer(now + 1000);
fireSeen.update(true);
}
// Continue. Do not evaluate window now
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Continue. We don't use event time timers
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Evaluate the window now
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
// Clear trigger state
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
firstSeen.clear();
}
}