Почему моргание не сбрасывает поздние данные? - PullRequest
0 голосов
/ 28 марта 2019

Я рассчитываю максимальное значение простого пара, и результат:

(S1,1000, S1, значение: 999)

(S1,2000, S1, значение: 41)

Последняя строка данных явно опаздывает: new SensorReading("S1", 999, 100L)

почему он был рассчитан по первому окну (0-1000)?

Я думаю, что первое окно должно сработать, когда придет SensorReading("S1", 41, 1000L).

Я очень смущен этим результатом.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(TrainingBase.parallelism);

        DataStream<SensorReading> input = env.fromElements(
                new SensorReading("S1", 35, 500L),
                new SensorReading("S1", 42, 999L),
                new SensorReading("S1", 41, 1000L),
                new SensorReading("S1", 40, 1200L),
                new SensorReading("S1", 23, 1400L),
                new SensorReading("S1", 999, 100L)
        );


        input.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
            private long currentMaxTimestamp;

            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp);
            }

            @Override
            public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
                currentMaxTimestamp = element.ts;
                return currentMaxTimestamp;
            }
        })
                .keyBy((KeySelector<SensorReading, String>) value -> value.sensorName)
                .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                .reduce(new MyReducingMax(), new MyWindowFunction())
                .print();

        env.execute();

MyReducingMax (), MyWindowFunction ()

private static class MyReducingMax implements ReduceFunction<SensorReading> {
        public SensorReading reduce(SensorReading r1, SensorReading r2) {
            return r1.getValue() > r2.getValue() ? r1 : r2;
        }
    }

private static class MyWindowFunction extends
            ProcessWindowFunction<SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

        @Override
        public void process(
                String key,
                Context context,
                Iterable<SensorReading> maxReading,
                Collector<Tuple3<String, Long, SensorReading>> out) {

            SensorReading max = maxReading.iterator().next();
            out.collect(new Tuple3<>(key, context.window().getEnd(), max));
        }
    }

    public static class SensorReading {
        String sensorName;
        int value;
        Long ts;

        public SensorReading() {
        }

        public SensorReading(String sensorName, int value, Long ts) {
            this.sensorName = sensorName;
            this.value = value;
            this.ts = ts;
        }

        public Long getTs() {
            return ts;
        }

        public void setTs(Long ts) {
            this.ts = ts;
        }

        public String getSensorName() {
            return sensorName;
        }

        public void setSensorName(String sensorName) {
            this.sensorName = sensorName;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public String toString() {

            return this.sensorName + "(" + this.ts + ") value: " + this.value;
        }

        ;
    }

1 Ответ

1 голос
/ 28 марта 2019

AssignerWithPeriodicWatermarks не создает водяной знак при каждой мыслимой возможности. Вместо этого Flink периодически вызывает такого присваивателя для получения последнего водяного знака, и по умолчанию это выполняется каждые 200 мсек (в реальном времени, а не во время события). Этот интервал контролируется ExecutionConfig.setAutoWatermarkInterval (...) .

Это означает, что все шесть ваших тестовых событий почти наверняка были обработаны до того, как мог быть вызван ваш присваиватель водяных знаков.

Если вы хотите иметь более предсказуемые водяные знаки, вместо этого вы можете использовать AssignerWithPunctuatedWatermarks.

Кстати, так, как написан ваш присваиватель водяных знаков, все события не по порядку могут опоздать. Более типично использовать BoundedOutOfOrdernessTimestampExtractor, который допускает некоторую неупорядоченность.

...