Как подготовить шаблон потока данных GCP / Apache Beam? - PullRequest
0 голосов
/ 07 января 2020

Хорошо, я должен что-то здесь упустить. Что мне нужно для создания конвейера в качестве шаблона? Когда я пытаюсь выполнить мой шаблон с помощью этих инструкций , он запускает модуль, но ничего не выполняет. Похоже, он работает без ошибок, но я не вижу, чтобы какие-либо файлы фактически добавлялись к местоположению корзины слушайте в моем --template_location. Должен ли мой код python быть там? Я так понимаю, правильно? Я удостоверился, что у меня установлены все SDK луча и Google Cloud, но, может быть, я что-то упустил? Что нужно сделать, чтобы подготовить этот шаблон потока данных? Также я могу вручную просто уронить файл в ведро и запустить его оттуда? Ниже приведен шаблон, с которым я сейчас играю:

import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json

GC_PROJECT = 'my-proj'
BUCKET = 'test-bucket'
STAGING_BUCKET = '%s/test' % BUCKET
TEMP_BUCKET = '%s/test' % BUCKET
# RUNNER = 'DataflowRunner'
RUNNER = 'DirectRunner'

# pipeline_args = ['--save_main_session']
pipeline_args = []
pipeline_args.append('--project=%s' % GC_PROJECT)
pipeline_args.append('--runner=%s' % RUNNER)
pipeline_args.append('--staging_location=gs://%s' % STAGING_BUCKET)
pipeline_args.append('--temp_location=gs://%s' % TEMP_BUCKET)

BQ_DATASET = 'lake'
BQ_TABLE = 'whatever'

SCHEMA_OBJ = [
    {"name": "id", "type": "STRING", "description": ""},
    {"name": "value", "type": "STRING", "description": ""}
]


class ContactUploadOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--infile',
            type=str,
            help='path of input file',
            default='gs://%s/data_files/test.csv' % BUCKET)

def run(argv=None):
    print('running')
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    lines = (p
             | beam.Create([
                {"id": "some random name", "value": "i dont know"},
                {"id": "id2", "value": "whatever man"}]))

    schema_str = '{"fields": ' + json.dumps(SCHEMA_OBJ) + '}'
    schema = parse_table_schema_from_json(schema_str)
    output_destination = '%s.%s' % (BQ_DATASET, BQ_TABLE)
    (lines
        | 'Write lines to BigQuery' >> beam.io.WriteToBigQuery(
            output_destination,
            schema=schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

    p.run().wait_until_finish()


if __name__ == '__main__':
    run(pipeline_args)


Кроме того, если кто-то может связать некоторые документы / ресурсы SDK, которые объясняют, как / почему вышеописанные инструкции по сбору должны работать, это было бы здорово!

1 Ответ

1 голос
/ 07 января 2020

Временное местоположение - это место, где временные файлы будут загружены во время выполнения задания. Вы не упомянули «template_location», где будет создан шаблон.

Пожалуйста, смотрите ссылку для создания шаблона и запуска шаблона

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...