Присвоение GenericRecord метки времени из внутреннего объекта - PullRequest
1 голос
/ 12 июня 2019

Обработка потоковых событий и запись файлов в почасовые сегменты является проблемой из-за окон, поскольку некоторые события из входящего часа могут переходить в предыдущие и т. Д.

Я копался в Apache Beam и его триггерахно я изо всех сил пытаюсь управлять запуском с помощью временной метки следующим образом ...

Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
                    .triggering(AfterProcessingTime
                     .pastFirstElementInPane()
                     .plusDelayOf(Duration.standardSeconds(1)))
                    .withAllowedLateness(Duration.ZERO)
                    .discardingFiredPanes())

Это то, что я делал до сих пор, запуск 1-минутных окон независимо от того, какая временная метка.Тем не менее, я хотел бы включить отметку времени в объект, чтобы она запускалась только для тех, кто находится внутри.

Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(AfterWatermark
                    .pastEndOfWindow())
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes())

У объектов, с которыми я имею дело, есть объект отметки времени, однако это длинныйполе, а не поле Instant.

"{ \"name\": \"timestamp\", \"type\": \"long\", \"logicalType\": \"timestamp-micros\" },"

Наличие моего класса POJO с этим полем long ничего не вызывает, но если я поменяю его на класс Instant и правильно воссоздаю объект,следующая ошибка выдается всякий раз, когда читается сообщение PubSub.

Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Long

Я также думал о создании своего рода класса-оболочки вокруг GenericRecord, который содержит метку времени, но должен был быпросто используйте часть GenericRecord, как только она будет готова для записи с FileIO в .parquet.

Какие еще способы использовать триггеры водяных знаков?

EDIT : после @Антон комментирует, я пробовал следующее.

.apply("Apply timestamps", WithTimestamps.of(
            (SerializableFunction<GenericRecord, Instant>) item -> new Instant(Long.valueOf(item.get("timestamp").toString())))
        .withAllowedTimestampSkew(Duration.standardSeconds(30)))

Даже если это устарело, похоже, что оно проходит через конвейер, но все еще не записано (все еще отбрасываетсяпредварительная запись по какой-либо причине с помощью ранее показанного триггера?).

А также попробовал другой упомянутый подход, используя outputWithTimestamp, но из-за задержки выводит следующую ошибку ...

Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 2019-06-12T18:59:58.609Z. Output timestamps must be no earlier than the timestamp of the current input (2019-06-12T18:59:59.848Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
...