Я создал пользовательский триггер и функцию обработки для своего потока событий.
DataStream<DynamoDBRow> dynamoDBRows =
sensorEvents
.keyBy("id")
.window(GlobalWindows.create())
.trigger(new MyCustomTrigger())
.allowedLateness(Time.minutes(1)) # Note
.process(new MyCustomWindowProcessFunction());
Мой триггер основан на параметре события. После получения сигнала об окончании события MyCustomWindowProcessFunction () применяется к элементам окна.
@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd() == true) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
Может быть несколько данных датчика, которые могут поступить даже после запуска. Поэтому я добавил .allowedLateness(Time.minutes(1))
, чтобы гарантировать, что эти события не будут пропущены при обработке.
В моем случае allowLateness не работает.
После перехода в документах я нашел это
Как включить allowLateness в GlobalWindow?
Примечание : Я также пытался установить параметры времени среды c
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Обновление: 20-02-2020
В настоящее время думаю о следующем подходе , (Пока не работает)
@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {
private final long allowedLatenessMillis;
public JourneyTrigger(Time allowedLateness) {
this.allowedLatenessMillis = allowedLateness.toMilliseconds();
}
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd() == true) {
log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
ctx.registerEventTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
log.info("onEvenTime called at "+System.currentTimeMillis() );
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}