Apache Flink - явный триггер вызова для глобальных окон - PullRequest
0 голосов
/ 04 сентября 2018

Я создаю глобальное окно для своего потока и пытаюсь собрать данные за n минут. То, что я сделал, реализовано с помощью собственной логики в функции onElement пользовательского триггера, которая запускает окно через n минут. Но у меня есть проблема, если я не получаю событие для определенного ключа после t минут, когда t , триггер для этого ключа никогда не срабатывает. Скажите, пожалуйста, могу ли я явно вызвать триггер для этой клавиши через n минут.

Код

Основной:

DataStream<EventData> dataStream = 
                    keyedEvents.window(GlobalWindows.create())
                    .trigger(new ServiceWindowTrigger())
                    .process(new ProcessEventWindow(configData))
                    .name("Collection Window")
                    .setParallelism(25);

Триггер:

public class ServiceWindowTrigger extends Trigger< MonitoringConsoleData, GlobalWindow> {


@Override
public TriggerResult onElement(MonitoringConsoleData element, long timestamp, GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception {



    String key = element.getKey();
    Long firstEventTime = Main.MainMap.get(key);

    // Find difference between first event timestamp and current timestamp
    Long diff = GlobalUtilities.getCurrentEpoch() - firstEventTime;

    long diffMinutes = diff / (60 * 1000) % 60;

    if(diffMinutes == 3)      // Trigger after 3 minutes
    {
        System.out.println(diff);
        System.out.println(diffMinutes);

        return TriggerResult.FIRE;
    }
    else
    {
        return TriggerResult.CONTINUE;
    }


}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception {
    // TODO Auto-generated method stub
    return null;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception {
    // TODO Auto-generated method stub

    return null;
}

@Override
public void clear(GlobalWindow window, org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
        throws Exception {
    // TODO Auto-generated method stub

}

}
...