Причина, по которой ваш тест не пройден, заключается в том, что окно никогда не запускается. Задание завершается до того, как окно достигает конца выделенного времени.
Причина этого связана с тем, как вы работаете со временем. Указывая
.keyBy("id","timestampRoundedToMinutes")
, вы организуете для всех событий для одного и того же идентификатора и с отметками времени в течение одной минуты, чтобы они были в одном и том же окне. Но поскольку вы используете управление временем обработки (а не окно времени событий), ваш windows не закроется, пока время дня, когда выполняется тест, не пересекает границу от одной минуты до следующей. Поскольку для обработки требуется всего четыре события, ваша работа вряд ли будет работать достаточно долго, чтобы это произошло.
Вместо этого вам нужно сделать что-то вроде этого: установите для характеристики времени c время события и обеспечить извлечение метки времени и присваивание водяного знака. Обратите внимание, что при этом нет необходимости вводить метку времени, округленную до минутных границ - это часть того, что время события windows делает в любом случае.
public static void main(String[] args) throws Exception {
...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(new CollectSink());
env.execute();
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(/* delay to handle out-of-orderness */);
}
@Override
public long extractTimestamp(Event event) {
return event.timestamp;
}
}
См. Документацию и учебные пособия для получения дополнительной информации о времени события, водяных знаках и управлении окнами.