Параметры экспортированного шаблона потока данных неизвестны - PullRequest
0 голосов
/ 11 февраля 2019

Я экспортировал шаблон облачного потока данных из Dataprep, как описано здесь:

https://cloud.google.com/dataprep/docs/html/Export-Basics_57344556

В Dataprep поток извлекает текстовые файлы через подстановочный знак из Google Cloud Storage, преобразуетданные и добавляет их в существующую таблицу BigQuery.Все работает, как задумано.

Однако при попытке запустить задание потока данных из экспортированного шаблона я не могу правильно настроить параметры запуска.Сообщения об ошибках не слишком специфичны, но ясно, что, с одной стороны, я неправильно понимаю расположение (ввод и вывод).

Единственный предоставленный Google шаблон для этого варианта использования (находится на https://cloud.google.com/dataflow/docs/guides/templates/provided-templates#cloud-storage-text-to-bigquery) не применяется, так как использует UDF, а также работает в пакетном режиме, перезаписывая любую существующую таблицу BigQuery, а не добавляя.

Проверка исходных данных задания потока данных из Dataprep показывает ряд параметров (находится в файле метаданных), но я не смог заставить их работать в моем коде. Вот пример одной такой неудачной конфигурации:

import time
from google.cloud import storage
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

def dummy(event, context):
    pass

def process_data(event, context):
    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials)

    data = event
    gsclient = storage.Client()
    file_name = data['name']

    time_stamp = time.time()

    GCSPATH="gs://[path to template]
    BODY = {
        "jobName": "GCS2BigQuery_{tstamp}".format(tstamp=time_stamp),
        "parameters": {
            "inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name),
            "outputLocations": '{{\"location1\":\"[project]:[dataset].[table]\", [... other locations]"}}',
            "customGcsTempLocation": "gs://[my bucket]/dataflow"
         },
         "environment": {
            "zone": "us-east1-b"
         }
    }

    print(BODY["parameters"])

    request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
    response = request.execute()

    print(response)

В приведенном выше примере указано недопустимое поле ("location1", который я извлек из завершенного задания Dataflow. Я знаю, что мне нужно указать местоположение GCS, расположение шаблона и таблицу BigQuery, но нигде не нашел правильный синтаксис. Как уже упоминалось выше, я нашел имена полей и примеры значенийв сгенерированном файле метаданных задания.

Я понимаю, что этот конкретный вариант использования может не промытьЕсли бы у кого-то были колокола, но в целом, если бы кто-то успешно определил и использовал правильные параметры запуска для задания потока данных, экспортированного из Dataprep, я был бы очень признателен за дополнительную информацию об этом.Thx.

1 Ответ

0 голосов
/ 27 августа 2019

Я думаю, что вам нужно просмотреть этот документ, в нем точно объясняется синтаксис, необходимый для передачи различных доступных параметров конвейера, включая необходимые параметры местоположения ... 1

...