Обработка потоковых событий и запись файлов в почасовые сегменты является проблемой из-за окон, поскольку некоторые события из входящего часа могут переходить в предыдущие и т. Д.
Я копался в Apache Beam и его триггерахно я изо всех сил пытаюсь управлять запуском с помощью временной метки следующим образом ...
Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
Это то, что я делал до сих пор, запуск 1-минутных окон независимо от того, какая временная метка.Тем не менее, я хотел бы включить отметку времени в объект, чтобы она запускалась только для тех, кто находится внутри.
Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark
.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
У объектов, с которыми я имею дело, есть объект отметки времени, однако это длинныйполе, а не поле Instant
.
"{ \"name\": \"timestamp\", \"type\": \"long\", \"logicalType\": \"timestamp-micros\" },"
Наличие моего класса POJO с этим полем long
ничего не вызывает, но если я поменяю его на класс Instant
и правильно воссоздаю объект,следующая ошибка выдается всякий раз, когда читается сообщение PubSub.
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Long
Я также думал о создании своего рода класса-оболочки вокруг GenericRecord, который содержит метку времени, но должен был быпросто используйте часть GenericRecord, как только она будет готова для записи с FileIO в .parquet.
Какие еще способы использовать триггеры водяных знаков?
EDIT : после @Антон комментирует, я пробовал следующее.
.apply("Apply timestamps", WithTimestamps.of(
(SerializableFunction<GenericRecord, Instant>) item -> new Instant(Long.valueOf(item.get("timestamp").toString())))
.withAllowedTimestampSkew(Duration.standardSeconds(30)))
Даже если это устарело, похоже, что оно проходит через конвейер, но все еще не записано (все еще отбрасываетсяпредварительная запись по какой-либо причине с помощью ранее показанного триггера?).
А также попробовал другой упомянутый подход, используя outputWithTimestamp
, но из-за задержки выводит следующую ошибку ...
Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 2019-06-12T18:59:58.609Z. Output timestamps must be no earlier than the timestamp of the current input (2019-06-12T18:59:59.848Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.