Как вызвать услугу передачи данных по запросу BigQuery? - PullRequest
0 голосов
/ 18 января 2020

Мне очень понравилась служба передачи данных BigQuery. У меня есть плоские файлы в точной схеме, которые можно загрузить в BQ. Было бы здорово просто настроить расписание DTS, которое будет подбирать файлы GCS, соответствующие шаблону, и загружать их в BQ. Мне нравится встроенная опция для удаления исходных файлов после копирования и электронной почты в случае возникновения проблем. Но самый большой облом в том, что минимальный интервал составляет 60 минут. Это безумно. Я мог бы жить с 10-минутной задержкой, возможно.

Итак, если я настроил DTS по требованию, как я могу вызвать его из API? Я думаю создать cronjob, который вызывает его по требованию каждые 10 минут. Но я не могу понять через документы, как это назвать.

Кроме того, каков мой второй самый надежный и дешевый способ перемещения файлов GCS (не требующий ETL) в таблицы bq, которые соответствуют точной схеме. Должен ли я использовать Cloud Scheduler, Cloud Functions, DataFlow, Cloud Run и т. Д. c.

Если я использую Cloud Function, как я могу отправить все файлы в моем GCS во время вызова как одно задание загрузки bq?

Наконец, кто-нибудь знает, понизит ли DTS лимит в будущем до 10 минут?

1 Ответ

0 голосов
/ 18 января 2020

Итак, если я настроил DTS по требованию, как я могу вызвать его из API? Я думаю создать cronjob, который вызывает его по требованию каждые 10 минут. Но я не могу понять через документы, как это назвать.

StartManualTransferRuns является частью RP C библиотеки , но не имеет REST API-эквивалент на данный момент. Как использовать это будет зависеть от вашей среды. Например, вы можете использовать клиентскую библиотеку Python ( docs ).

В качестве примера я использовал следующий код (вам нужно запустить pip install google-cloud-bigquery-datatransfer для зависимостей ):

import time

from google.cloud import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp


client = bigquery_datatransfer_v1.DataTransferServiceClient()

PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = '5e6...7bc'  # alphanumeric ID you'll find in the UI 

parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)

start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))

response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)

Обратите внимание, что вам необходимо использовать правильный идентификатор конфигурации передачи, а requested_run_time должен иметь тип bigquery_datatransfer_v1.types.Timestamp (для которого не было примеров в документации). Я установил время начала на 10 секунд раньше текущего времени выполнения.

Вы должны получить ответ, такой как:

runs {
  name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04"
  destination_dataset_id: "DATASET_NAME"
  schedule_time {
    seconds: 1579358571
    nanos: 922599371
  }
  ...
  data_source_id: "google_cloud_storage"
  state: PENDING
  params {
    ...
  }
  run_time {
    seconds: 1579358581
  }
  user_id: 28...65
}

, и передача запускается, как и ожидалось (не обращая внимания на ошибку):

enter image description here

Кроме того, каков мой второй самый надежный и дешевый способ переноса файлов GCS (ETL не требуется) в bq таблицы, которые соответствуют точной схеме. Должен ли я использовать Cloud Scheduler, Cloud Functions, DataFlow, Cloud Run и т. Д. c.

С этим вы можете настроить задание cron для выполнения вашей функции каждые десять минут. Как указывалось в комментариях, минимальный интервал составляет 60 минут, поэтому он не будет принимать файлы, возраст которых менее одного часа ( docs ).

Кроме того, это не очень надежное решение, и здесь вступают в игру ваши дополнительные вопросы. Я думаю, что они могут быть слишком широкими, чтобы их можно было рассмотреть в одном вопросе StackOverflow, но я бы сказал, что для refre по требованию sh Cloud Scheduler + Cloud Functions / Cloud Run может работать очень хорошо.

Поток данных будет лучше, если вам нужен ETL, но у него есть разъем GCS, который может просматривать шаблон файла ( пример ). При этом вы пропустите передачу, установите интервал просмотра и частоту запуска задания загрузки, чтобы записать файлы в BigQuery. Виртуальные машины будут постоянно работать в потоковом конвейере, в отличие от предыдущего подхода, но возможен 10-минутный период наблюдения.

Если у вас сложные рабочие процессы / зависимости, Airflow недавно представил операторов для запуска ручных запусков.

Если я использую облачную функцию, как я могу отправить все файлы в моем GCS во время вызова как одно задание загрузки bq?

Вы можете использовать подстановочные знаки , чтобы соответствовать шаблону файла при создании передачи:

enter image description here

Также это можно сделать для каждого файла с помощью Pub / Sub уведомлений для облачного хранилища для запуска функции облака.

Наконец, кто-нибудь знает, если DTS снизит лимит до 10 минут в будущем?

Уже есть запрос на добавление здесь . Не стесняйтесь star , чтобы показать ваш интерес и получать обновления

...