Я создаю глобальное окно для своего потока и пытаюсь собрать данные за n минут. То, что я сделал, реализовано с помощью собственной логики в функции onElement пользовательского триггера, которая запускает окно через n минут. Но у меня есть проблема, если я не получаю событие для определенного ключа после t минут, когда t , триггер для этого ключа никогда не срабатывает. Скажите, пожалуйста, могу ли я явно вызвать триггер для этой клавиши через n минут.
Код
Основной:
DataStream<EventData> dataStream =
keyedEvents.window(GlobalWindows.create())
.trigger(new ServiceWindowTrigger())
.process(new ProcessEventWindow(configData))
.name("Collection Window")
.setParallelism(25);
Триггер:
public class ServiceWindowTrigger extends Trigger< MonitoringConsoleData, GlobalWindow> {
@Override
public TriggerResult onElement(MonitoringConsoleData element, long timestamp, GlobalWindow window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception {
String key = element.getKey();
Long firstEventTime = Main.MainMap.get(key);
// Find difference between first event timestamp and current timestamp
Long diff = GlobalUtilities.getCurrentEpoch() - firstEventTime;
long diffMinutes = diff / (60 * 1000) % 60;
if(diffMinutes == 3) // Trigger after 3 minutes
{
System.out.println(diff);
System.out.println(diffMinutes);
return TriggerResult.FIRE;
}
else
{
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
public void clear(GlobalWindow window, org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
}