Ошибка при передаче аргумента в задание потока данных для остальных API-функций облака - PullRequest
0 голосов
/ 23 апреля 2020

Кто-нибудь может поделиться кодом потока данных python, чтобы принять параметры? Я сталкиваюсь с той же проблемой с аргументом, проходящим через остальные API. мой код df ниже: -

def run(argv=None):

    parser = argparse.ArgumentParser()

    # Specifically we have the input file in CSV format to read and the output BQ table to write.
    # This is the final stage of the pipeline, where we define the destination

    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        # This example file contains a total of only 10 lines.
        # Useful for developing on a small set of data.
        default='gs://intient_output/measurementunit.csv')

    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to be written. This can be a local file or '
        'a file in a Google Storage Bucket.',
        default='mygcpdataengineerlab:intientpoc.measurementunit'
        )

    # Parse arguments from the command line.
    known_args, pipeline_args = parser.parse_known_args(argv)
    data_ingestion = DataIngestion()
    project = ''

    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

трассировка стека исключений ниже

Error-    response = request.execute()
  File "/env/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/env/local/lib/python3.7/site-packages/googleapiclient/http.py", line 907, in execute
    raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/mygcpdataengineerlab/templates:launch?gcsPath=gs%3A%2F%2Fgcp_dataflow_csv_bq_code%2Ftemplates&alt=json returned "(9744cfd1809f74a): The workflow could not be created. Causes: (9744cfd1809fa2d): Found unexpected parameters: ['input' (perhaps you meant 'update'), 'output' (perhaps you meant 'job_port')]">

Ответы [ 2 ]

0 голосов
/ 30 апреля 2020

Я получил решение, приняв arg в качестве параметров конвейера. class MyPipeOpt (PipelineOptions): @classmethod def _add_argparse_args (cls, parser): parser.add_value_provider_argument ('- input', help = 'Входной файл для чтения. Это может быть локальный файл или файл в Google Storage Bucket.' . .

Спасибо,

0 голосов
/ 23 апреля 2020

Проблема с dest=input, так как этот аргумент ключевого слова не поддерживается в последних опциях луча. Для простоты вы можете сделать что-то вроде этого:

def run(argv=None, save_main_session=True):
  """Build and run the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--output_topic', required=True,
      help=('Output PubSub topic of the form '
            '"projects/<PROJECT>/topics/<TOPIC>".'))

  parser.add_argument(
      '--input_topic',
      help=('Input PubSub topic of the form '
            '"projects/<PROJECT>/topics/<TOPIC>".'))

  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
...