Я использую Spotify Scio
для создания конвейера потока данных Scala, который запускается сообщением Pub/Sub
. Он читает из нашего частного DB
, а затем вставляет информацию в BigQuery
.
Проблема:
- Мне нужно удалить предыдущие данные
- Для этого мне нужно использовать расположение записи
WRITE_TRUNCATE
- Но задание автоматически регистрируется как потоковое, и поэтому я получаю следующую ошибку:
WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
- Так что мне нужно вручную изменить конвейер на
Batch
, указав частоту запуска.
Итак, до сих пор у меня был следующий конвейер:
sc
.customInput("Job Trigger", inputIO)
.map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
.flatten
.withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
.groupBy(_.ssoId)
.map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
.filter(_.isSuccess)
.map(_.get)
.saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)
Не получается найти способ указать частоту запуска при использовании scio
api (saveAsBigQuery
).
Он присутствует только в нативном beam
API:
BigQueryIO
.write()
.withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
.to(bqTableName)
.withSchema(getSchema)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_TRUNCATE)
Если я использую BigQueryIO
, мне придется использовать sc.pipeline.apply
вместо моего текущего конвейера.
Есть ли способ как-то интегрировать BigQueryIO
в мой текущий конвейер или как-то указать withTriggeringFrequency
в текущем конвейере?