Таймер моргания не выполняется вовремя - PullRequest
0 голосов
/ 19 февраля 2019

Это дополнительный вопрос к: Триггеру, когда истекает состояние

Я сохраняю состояние каждого входящего элемента в потоке, и после того, как таймер выключается, я удаляю состояние.Это сделано для того, чтобы я мог предотвратить обработку дубликатов до истечения времени ожидания элемента, после чего я смогу снова обработать тот же элемент.Я

Я написал следующий код для тестирования таймеров, но кажется, что таймер срабатывает после того, как все 3 элемента прошли первый ProcessFunction.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    streamEnv.setParallelism(12);

    List<Tuple2<String, String>> inputList = new ArrayList<>();
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));

    streamEnv.fromCollection(inputList).keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Integer> occur;

                @Override
                public void open(Configuration parameters) throws Exception {
                    occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    if (occur.value() < 2) {
                        occur.update(occur.value() + 1);
                        out.collect(value);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                    }
                    else {
                        Thread.sleep(10000);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                        out.collect(value);
                    }
                }
            })
            .keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Tuple2<String, String>> storedTuple;

                @Override
                public void open(Configuration parameters) throws Exception {
                    storedTuple = getRuntimeContext().getState(new ValueStateDescriptor<>("storedTuple",
                            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    Tuple2<String, String> stored = storedTuple.value();
                    if (stored == null) {
                        LOGGER.info("[TEST] Storing Tuple {}", value);
                        storedTuple.update(value);
                        out.collect(value);
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 6000);
                    }
                }
            }

            @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    LOGGER.info("[TEST] Removing Tuple {}", storedTuple.value());
                    storedTuple.clear();
                }    
            )
            .addSink(new CollectSink());

    streamEnv.execute("Testing");
    for (Tuple2<String, String> tup: CollectSink.values) {
        System.out.println(tup);
    }

}

private static class CollectSink implements SinkFunction<Tuple2<String, String>> {

    static final List<Tuple2<String, String>> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Tuple2<String, String> value) throws Exception {
        values.add(value);
    }
}

У меня естьвходной список с 3 дубликатами элементов.В первом ProcessFunction я отправляю первые два элемента как есть, но задерживаю третий элемент на 10 секунд.

Во втором ProcessFunction он фильтрует элемент на основе того, сохранено ли для него состояние или нет,Как и ожидалось, первый элемент сохраняется и отправляется дальше, а второй элемент не существует, так как состояние уже существует.Для первого элемента, кроме отправки, я также установил таймер на 6 секунд, чтобы состояние сбрасывалось после его запуска.

Теперь третий элемент отправляется через 10 секунд, что означает, что 6-секундный триггер должен уже очистить состояние.Однако третий элемент также обрабатывается до срабатывания таймера.Я также вижу вывод как содержащий только 1 копию Tuple, хотя я ожидаю, что 2 копии.

Я добавил некоторые записи, чтобы лучше понять время выполнения.

[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,943] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Storing Tuple (Test,test)
[2019-02-19 14:11:58,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:58,896] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Removing Tuple (Test,test)

Вы можете видеть, что первые два кортежа испускаются вместе, как и ожидалось, с 10-секундной задержкой, после которой испускается третий кортежТеперь Removing Tuple происходит через 10 секунд, даже если он был запущен через 6 секунд после появления первого кортежа.

1 Ответ

0 голосов
/ 19 февраля 2019

Таймер времени события не сработает, пока не будет обработан водяной знак, превышающий время, указанное в таймере.Такой водяной знак может появиться только после обработки третьего события.Кроме того, со временем приема водяные знаки генерируются с использованием периодического генератора водяных знаков и по умолчанию вставляются в поток каждые 200 мсек.

...