Я написал задание Apache Beam с использованием Scio с целью генерации идентификаторов сеансов для входящих записей данных, а затем каким-то образом их обогащать, прежде чем выводить их в BigQuery.Вот код:
val measurements = sc.customInput("ReadFromPubsub",
PubsubIO
.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromSubscription(subscription)
)
measurements
.map(extractMeasurement).flatMap {
case Success(event) =>
Some(event)
case Failure(ex) =>
None
}
.timestampBy(_.timestamp)
.withSessionWindows(sessionGap, WindowOptions(
trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
allowedLateness = Duration.standardDays(1),
timestampCombiner = TimestampCombiner.END_OF_WINDOW
))
.keyBy(_.clientID)
.groupByKey
.toWindowed
.map(assignSessionID)
.toSCollection.flatMap(_.results)
.map(enrich)
.saveAsTypedBigQuery(output, bigquery.WRITE_APPEND, bigquery.CREATE_NEVER)
Я использую метку времени события, которая является значением ключа атрибута ts
в PubsubMessage
, в качестве моего атрибута метки времени.Это та же самая временная метка, что и в .timestampBy
, которую я использую, до того, как мои данные будут обработаны.Я ожидаю, что выходной триггер сработает, как только водяной знак пройдет мимо sessionGap (по умолчанию 30 минут).
При использовании как Dataflow runner, так и DirectRunner триггер никогда не срабатывает, даже если я моделирую данные с помощьювременные метки с интервалом более 30 минут.В пользовательском интерфейсе потока данных я вижу, что водяной знак никогда не продвигается на основе отметок времени события, а только каждую минуту, как будто данные не были получены.
Я проверил, что данные действительно были получены, так как выполняется преобразование перед оконным управлением.Я также протестировал с около 10 записей в секунду , но, возможно, этого все еще недостаточно для обновления водяного знака?Я также настроил JobTest, в котором я получаю ожидаемый результат, а также сигнализирую мне, что проблема основана на отметке времени / водяного знака.
Я уверен, что пропустил что-то важное в документации или где-то допустил глупую ошибку и надеялся, что кто-то может указать мне правильное направление.