Ошибка «Невозможно разобрать файл» при запуске пользовательского шаблона в потоке данных - PullRequest
4 голосов
/ 15 апреля 2019

Я пытаюсь написать собственный шаблон для чтения CSV и вывести его в другой CSV. Цель состоит в том, чтобы выбрать нужные данные в этом CSV. Когда я запускаю его в веб-интерфейсе, у меня появляется следующая ошибка

Я максимально уменьшил код, чтобы понять свою ошибку, но все еще не вижу ее. Я помог себе с документацией: https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#creating-and-staging-templates

class UploadOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            default='gs://[MYBUCKET]/input.csv',
            help='Path of the file to read from')
        parser.add_value_provider_argument(
            '--output',
            required=True,
            help='Output file to write results to.')

pipeline_options = PipelineOptions(['--output', 'gs://[MYBUCKET]/output'])
p = beam.Pipeline(options=pipeline_options)
upload_options = pipeline_options.view_as(UploadOptions)

(p
    | 'read' >> beam.io.Read(upload_options.input)
    | 'Write' >> beam.io.WriteToText(upload_options.output, file_name_suffix='.csv'))

Текущая ошибка выглядит следующим образом

Невозможно проанализировать файл 'gs: //MYBUCKET/template.py'.

В терминале у меня следующая ошибка

ОШИБКА: (gcloud.dataflow.jobs.run) FAILED_PRECONDITION: Невозможно проанализировать файл шаблона 'gs: // [MYBUCKET] /template.py'. - '@type': type.googleapis.com/google.rpc.PreconditionFailure нарушения: - описание: "Неожиданный конец потока: ожидается '{'" тема: 0: 0 тип: JSON

Заранее спасибо

1 Ответ

1 голос
/ 18 апреля 2019

Мне удалось решить мою проблему. Проблема возникла из-за переменной, которую я использовал при чтении моего конвейера. Переменная custom_options должна использоваться в переменной Read, а не в переменной known_args

custom_options = pipeline_options.view_as(CustomPipelineOptions)

Я сделал общий код и поделился своим решением, если кому-то это понадобится.

from __future__ import absolute_import
import argparse

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions

class CustomPipelineOptions(PipelineOptions):
    """
    Runtime Parameters given during template execution
    path and organization parameters are necessary for execution of pipeline
    campaign is optional for committing to bigquery
    """
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--path',
            type=str,
            help='Path of the file to read from')
        parser.add_value_provider_argument(
            '--output',
            type=str,
            help='Output file if needed')

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    global cloud_options
    global custom_options

    pipeline_options = PipelineOptions(pipeline_args)
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    p = beam.Pipeline(options=pipeline_options)

    init_data = (p
                        | 'Hello World' >> beam.Create(['Hello World'])
                        | 'Read Input path' >> beam.Read(custom_options.path)
                 )

    result = p.run()
    # result.wait_until_finish

if __name__ == '__main__':
    run()

Затем запустите следующую команду, чтобы сгенерировать шаблоны в GCP

python template.py --runner DataflowRunner --project $PROJECT --staging_location gs://$BUCKET/staging --temp_location gs://$BUCKET/temp --
template_location gs://$BUCKET/templates/$TemplateName
...