Я экспортировал шаблон облачного потока данных из Dataprep, как описано здесь:
https://cloud.google.com/dataprep/docs/html/Export-Basics_57344556
В Dataprep поток извлекает текстовые файлы через подстановочный знак из Google Cloud Storage, преобразуетданные и добавляет их в существующую таблицу BigQuery.Все работает, как задумано.
Однако при попытке запустить задание потока данных из экспортированного шаблона я не могу правильно настроить параметры запуска.Сообщения об ошибках не слишком специфичны, но ясно, что, с одной стороны, я неправильно понимаю расположение (ввод и вывод).
Единственный предоставленный Google шаблон для этого варианта использования (находится на https://cloud.google.com/dataflow/docs/guides/templates/provided-templates#cloud-storage-text-to-bigquery) не применяется, так как использует UDF, а также работает в пакетном режиме, перезаписывая любую существующую таблицу BigQuery, а не добавляя.
Проверка исходных данных задания потока данных из Dataprep показывает ряд параметров (находится в файле метаданных), но я не смог заставить их работать в моем коде. Вот пример одной такой неудачной конфигурации:
import time
from google.cloud import storage
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
def dummy(event, context):
pass
def process_data(event, context):
credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials=credentials)
data = event
gsclient = storage.Client()
file_name = data['name']
time_stamp = time.time()
GCSPATH="gs://[path to template]
BODY = {
"jobName": "GCS2BigQuery_{tstamp}".format(tstamp=time_stamp),
"parameters": {
"inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name),
"outputLocations": '{{\"location1\":\"[project]:[dataset].[table]\", [... other locations]"}}',
"customGcsTempLocation": "gs://[my bucket]/dataflow"
},
"environment": {
"zone": "us-east1-b"
}
}
print(BODY["parameters"])
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()
print(response)
В приведенном выше примере указано недопустимое поле ("location1", который я извлек из завершенного задания Dataflow. Я знаю, что мне нужно указать местоположение GCS, расположение шаблона и таблицу BigQuery, но нигде не нашел правильный синтаксис. Как уже упоминалось выше, я нашел имена полей и примеры значенийв сгенерированном файле метаданных задания.
Я понимаю, что этот конкретный вариант использования может не промытьЕсли бы у кого-то были колокола, но в целом, если бы кто-то успешно определил и использовал правильные параметры запуска для задания потока данных, экспортированного из Dataprep, я был бы очень признателен за дополнительную информацию об этом.Thx.