Как реализовать триггер в Flink, который буферизирует до истечения времени ожидания и срабатывает по истечении времени ожидания? - PullRequest
1 голос
/ 17 марта 2020

Как реализовать триггер в Flink, который буферизует до истечения времени ожидания и срабатывает по истечении времени ожидания?

Я хочу, чтобы триггер был зарегистрирован, если в окне есть хотя бы один элемент, а затем буферизировал до секунды и сработать, когда пройдет одна секунда. Если в окне нет элементов, то триггер не будет регистрироваться сам, поэтому я не ожидаю увидеть какие-либо выходные данные.

Я не хочу, чтобы триггер генерировал много трафика c каждую секунду, независимо от наличия элементов в окно или нет. с другой стороны, говорят, что в окне есть только один элемент, я не хочу, чтобы он сидел там и ждал до водяного знака или навсегда. Вместо этого я хочу установить тайм-аут, чтобы я мог видеть, что один элемент, по крайней мере, через секунду.

Делает ли это ProcessingTimeTrigger.create()? если так, что отличается между ProcessingTimeTrigger.create() против CountinousProcessingTimeTrigger?

1 Ответ

1 голос
/ 17 марта 2020

Обычное окно обработки длительностью в одну секунду предоставит вам окно, содержащее все события, которые происходят в течение одной секунды, за любую секунду, в которой есть хотя бы одно событие. Но это окно не будет выровнено по первому событию; он будет выровнен по времени суток. Так, например, если первое событие в окне происходит на полпути в течение данной секунды, то это окно будет включать в себя только события для 500 мсек c, следующих за первым.

A ProcessingTimeTrigger срабатывает один раз в конце окна. A CountinousProcessingTimeTrigger запускается несколько раз с определенной скоростью.

Чтобы получить именно ту семантику, которую вы описали, вам понадобится пользовательский триггер. Вы можете сделать что-то похожее на этот OneSecondIntervalTrigger пример , за исключением того, что вы захотите переключиться с использования времени события на время обработки и запускать только один раз, а не повторно.

Это приведет к Вы с чем-то вроде этого:

public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {

    @Override
    public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // firstSeen will be false if not set yet
        ValueState<Boolean> firstSeen = ctx.getPartitionedState(
            new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));

        // register initial timer only for first element
        if (firstSeen.value() == null) {
            // FIRE the window 1000 msec after the first event
            long now = ctx.getCurrentProcessingTime();
            ctx.registerProcessingTimeTimer(now + 1000);
            fireSeen.update(true);
        }
        // Continue. Do not evaluate window now
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // Continue. We don't use event time timers
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // Evaluate the window now
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
        // Clear trigger state
        ValueState<Boolean> firstSeen = ctx.getPartitionedState(
            new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
        firstSeen.clear();
    }
}
...