Ошибка получения аргумента при запуске потока данных - PullRequest
0 голосов
/ 11 марта 2019

Я написал этот конвейер лучей Apache, чтобы выполнить какое-то преобразование из входного облака в выходное облако. Я использую pub / sub. Когда я пытаюсь выполнить свой код, я получаю аргумент error. Пожалуйста, помогите мне с этой ошибкой.Правильно ли написан мой код.

from __future__ import absolute_import

import argparse
import logging

from past.builtins import unicode

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def run(argv=None):
  """Build and run the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--output_filename', required=True,
      help=('Output PubSub topic of the form '
            '"gs://*******/*******".'))
  group = parser.add_mutually_exclusive_group(required=True)
  group.add_argument(
      '--input_topic',
      help=('Input PubSub topic of the form '
            '"projects/*******/topics/*******".'))
  group.add_argument(
      '--input_subscription',
      help=('Input PubSub subscription of the form '
            '"projects/*******/subscriptions/*******."'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  pipeline_options.view_as(StandardOptions).streaming = True
  p = beam.Pipeline(options=pipeline_options)

  # Read from PubSub into a PCollection.
  if known_args.input_subscription:
    messages = (p
                | beam.io.ReadFromPubSub(
                    subscription=known_args.input_subscription)
                .with_output_types(bytes))
  else:
    messages = (p
                | beam.io.ReadFromPubSub(topic=known_args.input_topic)
                .with_output_types(bytes))

  lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))

  # Count the occurrences of each word.
  class Split(beam.DoFn):
    def process(self,element):
        element = element.rstrip("\n").encode('utf-8')
        text = element.split(',') 
        result = []
        for i in range(len(text)):
            dat = text[i]
            #print(dat)
            client = language.LanguageServiceClient()
            document = types.Document(content=dat,type=enums.Document.Type.PLAIN_TEXT)
            sent_analysis = client.analyze_sentiment(document=document)
            sentiment = sent_analysis.document_sentiment
            data = [
            (dat,sentiment.score)
            ] 
            result.append(data)
        return result
      # Format the counts into a PCollection of strings.
  class WriteToCSV(beam.DoFn):
    def process(self, element):
      return [
        "{},{}".format(
          element[0][0],
          element[0][1]
        )
      ]

  counts = (lines
            | 'split' >> beam.ParDo(Split())
            | beam.WindowInto(window.FixedWindows(15, 0))
            |'CSV formatting' >> beam.ParDo(WriteToCSV()) 
            |'transfer to output GCS' >> beam.io.WriteToText(known_args.output_filename)
  )

  result = p.run()
  result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

(env) **** @ cloudshell: ~ / template (*********) $ python -m pipeline.test2 --output_filename gs: // ******* / output_files --input_topic projects / ******* / themes / ******* --input_subscription projects / ******* /подписки / ******* использование: test2.py [-h] --output_filename OUTPUT_FILENAME (--input_topic INPUT_TOPIC | --input_subscription INPUT_SUBSCRIPTION) test2.py: ошибка: аргумент --input_subscription: запрещено с аргументом -input_topic

...