Ошибка атрибута при создании пользовательского шаблона с использованием python в Google Cloud DataFlow - PullRequest
0 голосов
/ 24 января 2020

Возникла проблема при создании пользовательского шаблона для Cloud Dataflow. это простой код, который берет данные из корзины ввода и загружает их в BigQuery. Мы хотим загрузить много таблиц, поэтому пытаемся создать собственный шаблон. как только это сработает, следующим шагом будет передача набора данных также в качестве параметра.

Сообщение об ошибке:

AttributeError: у объекта 'StaticValueProvider' нет атрибута 'datasetId'

Код

class ContactUploadOptions(PipelineOptions):
    """
    Runtime Parameters given during template execution
    path and organization parameters are necessary for execution of pipeline
    campaign is optional for committing to bigquery
    """

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            type=str,
            help='Path of the file to read from'
            )
        parser.add_value_provider_argument(
            '--output',
            type=str,
            help='Output BQ table for the pipeline')


def run(argv=None):
    """The main function which creates the pipeline and runs it."""

    global PROJECT
    from google.cloud import bigquery


    # Retrieve project Id and append to PROJECT form GoogleCloudOptions

    # Initialize runtime parameters as object
    contact_options = PipelineOptions().view_as(ContactUploadOptions)
    PROJECT = PipelineOptions().view_as(GoogleCloudOptions).project
    client = bigquery.Client(project=PROJECT)
    dataset = client.dataset('pharma')    
    data_ingestion = DataIngestion()
    pipeline_options = PipelineOptions()
    # Save main session state so pickled functions and classes
    # defined in __main__ can be unpickled
    pipeline_options.view_as(SetupOptions).save_main_session = True
    # Parse arguments from command line.
    #data_ingestion = DataIngestion()

    # Instantiate pipeline
    options = PipelineOptions()
    p = beam.Pipeline(options=options)
    (p
     | 'Read from a File' >> beam.io.ReadFromText(contact_options.input, skip_header_lines=0)
     | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s))
     | 'Write to BigQuery' >> beam.io.Write(
                beam.io.BigQuerySink(
                    contact_options.output,
                    schema='assetid:INTEGER,assetname:STRING,prodcd:INTEGER',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
     )

Моя команда следующая:

python3 -m pharm_template --runner DataflowRunner  --project jupiter-120  --staging_location gs://input-cdc/temp/staging  --temp_location gs://input-cdc/temp/   --template_location gs://code-cdc/temp/templates/jupiter_pipeline_template

Что я пробовал:

Я пытался передать --input и --output Я тоже пробовал --experiment=use_beam_bq_sink но безрезультатно. Я также попытался передать datasetID

datasetId = StaticValueProvider(str, 'pharma')

, но не повезло. Если кто-то создал шаблон, который загружается в BQ, тогда я могу взять реплику и исправить эту проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...