Если вы измените свой присваиватель водяных знаков таким образом
AssignerWithPunctuatedWatermarks eventTimeFunction = new AssignerWithPunctuatedWatermarks<MyEvent>() {
long maxTs = 0;
@Override
public long extractTimestamp(MyEvent myEvent, long l) {
long ts = myEvent.getEventTime();
if (ts > maxTs) {
maxTs = ts;
}
return ts;
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent event, long extractedTimestamp) {
return new Watermark(maxTs - 10000);
}
};
, тогда вы получите ожидаемые результаты.Я не рекомендую это - просто использую его, чтобы проиллюстрировать, что происходит.
Здесь происходит то, что BoundedOutOfOrdernessTimestampExtractor
- это периодический генератор водяных знаков, который будет вставлять водяной знак в поток каждые 200 мсек (по умолчанию).Поскольку ваша работа завершается задолго до этого, единственным водяным знаком, с которым сталкивается ваша работа, является тот, который Флинк вставляет в конце каждого конечного потока (со значением MAX_WATERMARK).Задержка относится к водяным знакам, и событие, которое вы ожидали опоздать, успевает наступить до этого водяного знака.
Переключаясь на пунктуированные водяные знаки, вы можете заставить водяные знаки появляться чаще или точнее в определенных точках впоток.Как правило, в этом нет необходимости (и слишком частые водяные знаки приводят к накладным расходам), но это полезно, если вы хотите иметь строгий контроль над последовательностью водяных знаков.
Что касается написания тестов, вы можете взглянуть на тестовые наборы , используемые в собственных тестах Флинка, или в flink-spector .
Обновление:
Интервал времени, связанный с BoundedOutOfOrdernessTimestampExtractor, является спецификациейнасколько ожидается выход из строя потока.События, поступающие в пределах этой границы, не считаются запоздалыми, и таймеры времени событий не будут срабатывать до тех пор, пока не истечет эта задержка, что дает время для поступления событий, вышедших из строя.allowLateness применяется только к оконному API и описывает, как долго по истечении обычного времени срабатывания окна каркас сохраняет состояние окна, так что события все еще могут быть добавлены в окно и вызывать поздние срабатывания.По истечении этого дополнительного интервала состояние окна очищается, и последующие события отправляются на боковой выход (если настроено).
Поэтому, когда вы используете BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10))
, вы не говорите «подождите 10 секунд после каждого события на случай, если более ранние события могутеще приеду ".Но вы говорите, что ваши события должны быть не более 10 секунд не в порядке.Таким образом, если вы обрабатываете поток событий в реальном времени, это означает, что вы будете ждать не более 10 секунд в случае поступления более ранних событий.(А если вы обрабатываете исторические данные, то вы можете обрабатывать 10 секунд данных за 1 секунду или нет - знание того, что вы будете ждать n секунд времени передачи события, ничего не говорит о том, сколько на самом деле это займет времени.)
Подробнее об этом см. Время события и водяные знаки .