Flink BoundedOutOfOrdernessGenerator - проблемы с водяными знаками - PullRequest
0 голосов
/ 06 мая 2019

Я пытаюсь построить свою собственную реализацию BoundedOutOfOrdernessGenerator, как предложено в документации Flink , и у меня возникают некоторые проблемы, связанные с этим, поскольку кажется, что водяной знак не обновляется корректно.Windows никогда не запускается, потому что водяной знак не продвигается.

Это пара соответствующих фрагментов, где myWatermarkAssigner - это WatermarkGenerator экземпляр:

Основной поток:

dataLeft = dataLeft.assignTimestampsAndWatermarks(myWatermarkAssigner);
dataRight = dataRight.assignTimestampsAndWatermarks(myWatermarkAssigner);

DataStream<MyWindow> output = dataLeft
    .keyBy("field")
    .coGroup(dataRight.keyBy("field"))
    .where(dataL -> dataL.id)
    .equalTo(dataR -> dataR.id)
    .window(TumblingEventTimeWindows.of(duration))
    .apply(myProcessor);

Класс генератора:

public class WatermarkGenerator<T extends MyEvent> implements AssignerWithPeriodicWatermarks<T> {
    private static final long serialVersionUID = 1L;
    private long maxOutOfOrderness = 15000; // 15 seconds
    private long currentMaxTimestamp;

    public WatermarkGenerator(long maxTimeLag) {
        this.maxOutOfOrderness = maxTimeLag;
    }

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.eventTime.getTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);

        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

Если я добавлю простой журнал внутри getCurrentWatermark(), это будут результаты:

0
0
0
0
0
1557157602000 // First element just arrived
0
0
0
1557157602000
0
0
0
1557157602000
0
0
0

И окна никогда не запускаются, я понимаю, что это потому, что водяной знак иногда равен 0, а иногда и правильному значению.

Может ли это быть связано с тем, что я использую один и тот же экземпляр WatermarkGenerator для dataLeft и dataRight, и я получаю только события на dataLeft?

Кроме того, если я использую вместо этого системное время для водяного знака, как это, окна запускаются и конвейер работает какОчарование:

// return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return new Watermark(System.currentTimeMillis() - maxOutOfOrderness);

В качестве некоторых заключительных замечаний я использую .setAutoWatermarkInterval(1000L), и я попробовал с встроенную BoundedOutOfOrdernessTimestampExtractor реализацию с такими же ошибочными результатами.

...