Я пытаюсь построить свою собственную реализацию BoundedOutOfOrdernessGenerator
, как предложено в документации Flink , и у меня возникают некоторые проблемы, связанные с этим, поскольку кажется, что водяной знак не обновляется корректно.Windows никогда не запускается, потому что водяной знак не продвигается.
Это пара соответствующих фрагментов, где myWatermarkAssigner
- это WatermarkGenerator
экземпляр:
Основной поток:
dataLeft = dataLeft.assignTimestampsAndWatermarks(myWatermarkAssigner);
dataRight = dataRight.assignTimestampsAndWatermarks(myWatermarkAssigner);
DataStream<MyWindow> output = dataLeft
.keyBy("field")
.coGroup(dataRight.keyBy("field"))
.where(dataL -> dataL.id)
.equalTo(dataR -> dataR.id)
.window(TumblingEventTimeWindows.of(duration))
.apply(myProcessor);
Класс генератора:
public class WatermarkGenerator<T extends MyEvent> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
private long maxOutOfOrderness = 15000; // 15 seconds
private long currentMaxTimestamp;
public WatermarkGenerator(long maxTimeLag) {
this.maxOutOfOrderness = maxTimeLag;
}
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.eventTime.getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
Если я добавлю простой журнал внутри getCurrentWatermark()
, это будут результаты:
0
0
0
0
0
1557157602000 // First element just arrived
0
0
0
1557157602000
0
0
0
1557157602000
0
0
0
И окна никогда не запускаются, я понимаю, что это потому, что водяной знак иногда равен 0, а иногда и правильному значению.
Может ли это быть связано с тем, что я использую один и тот же экземпляр WatermarkGenerator
для dataLeft
и dataRight
, и я получаю только события на dataLeft
?
Кроме того, если я использую вместо этого системное время для водяного знака, как это, окна запускаются и конвейер работает какОчарование:
// return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return new Watermark(System.currentTimeMillis() - maxOutOfOrderness);
В качестве некоторых заключительных замечаний я использую .setAutoWatermarkInterval(1000L)
, и я попробовал с встроенную BoundedOutOfOrdernessTimestampExtractor
реализацию с такими же ошибочными результатами.