Apache Beam Сохранение в BigQuery с использованием Scio и явное указание TriggeringFrequency - PullRequest
0 голосов
/ 17 июня 2019

Я использую Spotify Scio для создания конвейера потока данных Scala, который запускается сообщением Pub/Sub. Он читает из нашего частного DB, а затем вставляет информацию в BigQuery.

Проблема:

  • Мне нужно удалить предыдущие данные
  • Для этого мне нужно использовать расположение записи WRITE_TRUNCATE
  • Но задание автоматически регистрируется как потоковое, и поэтому я получаю следующую ошибку: WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
  • Так что мне нужно вручную изменить конвейер на Batch, указав частоту запуска.

Итак, до сих пор у меня был следующий конвейер:

sc
  .customInput("Job Trigger", inputIO)
  .map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
  .flatten
  .withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
  .groupBy(_.ssoId)
  .map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
  .filter(_.isSuccess)
  .map(_.get)
  .saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)

Не получается найти способ указать частоту запуска при использовании scio api (saveAsBigQuery).

Он присутствует только в нативном beam API:

BigQueryIO
  .write()
  .withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
  .to(bqTableName)
  .withSchema(getSchema)
  .withCreateDisposition(CREATE_NEVER)
  .withWriteDisposition(WRITE_TRUNCATE)

Если я использую BigQueryIO, мне придется использовать sc.pipeline.apply вместо моего текущего конвейера.

Есть ли способ как-то интегрировать BigQueryIO в мой текущий конвейер или как-то указать withTriggeringFrequency в текущем конвейере?

1 Ответ

0 голосов
/ 17 июня 2019

Scio в настоящее время не поддерживает указание метода, который будет использоваться для загрузки данных в Big Query.Поскольку это невозможно, автоматически STREAMING_INSERTS используется для неограниченных коллекций, которые, очевидно, не могут поддерживать усечение.Следовательно, вам необходимо вернуться к BigQueryIO Beam, указав частоту запуска (withTriggeringFrequency(...)) и метод (withMethod(Method.FILE_LOADS)).

Чтобы интегрировать его в свой конвейер Scio, вы можете просто использовать saveAsCustomOutput.Пример также можно найти здесь: https://spotify.github.io/scio/io/Type-Safe-BigQuery#using-type-safe-bigquery-directly-with-beams-io-library

...