Итак, если я настроил DTS по требованию, как я могу вызвать его из API? Я думаю создать cronjob, который вызывает его по требованию каждые 10 минут. Но я не могу понять через документы, как это назвать.
StartManualTransferRuns
является частью RP C библиотеки , но не имеет REST API-эквивалент на данный момент. Как использовать это будет зависеть от вашей среды. Например, вы можете использовать клиентскую библиотеку Python ( docs ).
В качестве примера я использовал следующий код (вам нужно запустить pip install google-cloud-bigquery-datatransfer
для зависимостей ):
import time
from google.cloud import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp
client = bigquery_datatransfer_v1.DataTransferServiceClient()
PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = '5e6...7bc' # alphanumeric ID you'll find in the UI
parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)
start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)
Обратите внимание, что вам необходимо использовать правильный идентификатор конфигурации передачи, а requested_run_time
должен иметь тип bigquery_datatransfer_v1.types.Timestamp
(для которого не было примеров в документации). Я установил время начала на 10 секунд раньше текущего времени выполнения.
Вы должны получить ответ, такой как:
runs {
name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04"
destination_dataset_id: "DATASET_NAME"
schedule_time {
seconds: 1579358571
nanos: 922599371
}
...
data_source_id: "google_cloud_storage"
state: PENDING
params {
...
}
run_time {
seconds: 1579358581
}
user_id: 28...65
}
, и передача запускается, как и ожидалось (не обращая внимания на ошибку):
Кроме того, каков мой второй самый надежный и дешевый способ переноса файлов GCS (ETL не требуется) в bq таблицы, которые соответствуют точной схеме. Должен ли я использовать Cloud Scheduler, Cloud Functions, DataFlow, Cloud Run и т. Д. c.
С этим вы можете настроить задание cron для выполнения вашей функции каждые десять минут. Как указывалось в комментариях, минимальный интервал составляет 60 минут, поэтому он не будет принимать файлы, возраст которых менее одного часа ( docs ).
Кроме того, это не очень надежное решение, и здесь вступают в игру ваши дополнительные вопросы. Я думаю, что они могут быть слишком широкими, чтобы их можно было рассмотреть в одном вопросе StackOverflow, но я бы сказал, что для refre по требованию sh Cloud Scheduler + Cloud Functions / Cloud Run может работать очень хорошо.
Поток данных будет лучше, если вам нужен ETL, но у него есть разъем GCS, который может просматривать шаблон файла ( пример ). При этом вы пропустите передачу, установите интервал просмотра и частоту запуска задания загрузки, чтобы записать файлы в BigQuery. Виртуальные машины будут постоянно работать в потоковом конвейере, в отличие от предыдущего подхода, но возможен 10-минутный период наблюдения.
Если у вас сложные рабочие процессы / зависимости, Airflow недавно представил операторов для запуска ручных запусков.
Если я использую облачную функцию, как я могу отправить все файлы в моем GCS во время вызова как одно задание загрузки bq?
Вы можете использовать подстановочные знаки , чтобы соответствовать шаблону файла при создании передачи:
Также это можно сделать для каждого файла с помощью Pub / Sub уведомлений для облачного хранилища для запуска функции облака.
Наконец, кто-нибудь знает, если DTS снизит лимит до 10 минут в будущем?
Уже есть запрос на добавление здесь . Не стесняйтесь star , чтобы показать ваш интерес и получать обновления