Ошибка потока данных от жгута SDK для инструкции - PullRequest
1 голос
/ 09 марта 2019

У меня есть ведро GCS, где я получаю файлы каждую минуту, поэтому я использую метод pub / sub для получения новых файлов и выполняю некоторые преобразования и сохранение в другое ведро. Но я получаю

«Ошибка от жгута проводов SDK для инструкции»

Это мой код:

from __future__ import absolute_import
import os
import logging
import argparse
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
from datetime import datetime
import apache_beam as beam 
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.textio import ReadFromText, WriteToText

def run(argv=None):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--output_filename', required=True,
        help=('Output GCS bucket '
            '"gs:/***********/output_files".'))
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
        '--input_topic',
        help=('Input PubSub topic of the form '
            '"projects/*******/topics/testsub1".'))
    group.add_argument(
        '--input_subscription',
        help=('Input PubSub subscription of the form '
            '"projects/*********/*****/test_subscription."'))
    known_args, pipeline_args = parser.parse_known_args(argv)

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=pipeline_options)


    # Read from PubSub into a PCollection.
    if known_args.input_subscription:
        messages = (p
                    | beam.io.ReadFromPubSub(
                        subscription=known_args.input_subscription)
                    .with_output_types(bytes))
    else:
        messages = (p
                    | beam.io.ReadFromPubSub(topic=known_args.input_topic)
                    .with_output_types(bytes))

    lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))

    class Split(beam.DoFn):
        def process(self,element):
            element = element.rstrip("\n").encode('utf-8')
            text = element.split(',') 
            result = []
            for i in range(len(text)):
                dat = text[i]
                #print(dat)
                client = language.LanguageServiceClient()
                document = types.Document(content=dat,type=enums.Document.Type.PLAIN_TEXT)
                sent_analysis = client.analyze_sentiment(document=document)
                sentiment = sent_analysis.document_sentiment
                data = [
                (dat,sentiment.score)
                ] 
                result.append(data)
            return result

    class WriteToCSV(beam.DoFn):
        def process(self, element):
            return [
                "{},{}".format(
                    element[0][0],
                    element[0][1]
                )
            ]

    Transform = (lines
                | 'split' >> beam.ParDo(Split())
                | beam.ParDo(WriteToCSV()) 
                | beam.io.WriteToText(known_args.output_filename)
    )
    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Я запускаю это, написав следующий код:

python SentAnal.py \ --runner DataflowRunner \ --project b ********** \ --temp_location gs: // пекарь ******** / tmp / \ --input_topic "projects / ****** / themes / *****" \ --output_filename "gs: // ********* / ********" \ --streaming \ --experiment = allow_non_updatable_job

...