Прямой бегун работает, но поток данных не работает - PullRequest
1 голос
/ 18 марта 2019

У меня есть входной gcs, где я получаю файлы, и я преобразую и загружаю его в другой gcs. Он работает нормально, когда я использую прямой запуск, но когда я использую обработчик потока данных, он не запускается.

Вот как я запускаю мой бегунок потока данных:

python -m pipeline.pubsub2 --runner DataflowRunner --project my_project --temp_location gs://proj-temp/temp1/ --input_topic "projects/my_project/topics/testsub1" --output "gs://baker-sentimental-2/" --streaming --setup_file ./setup.py

Я получаю эти ошибки, пока поток данных работает:

E  EXT4-fs (dm-0): couldn't mount as ext3 due to feature incompatibilities 
E  Error initializing dynamic plugin prober: Error (re-)creating driver directory: mkdir /usr/libexec/kubernetes: read-only file system 
E  Image garbage collection failed once. Stats initialization may not have completed yet: failed to get imageFs info: unable to find data for container / 
E  [ContainerManager]: Fail to get rootfs information unable to find data for container / 
E  Handler for GET /v1.27/images/gcr.io/cloud-dataflow/v1beta3/harness:2.11.0/json returned error: No such image: gcr.io/cloud-dataflow/v1beta3/harness:2.11.0 
E  while reading 'google-dockercfg' metadata: http status code: 404 while fetching url http://metadata.google.internal./computeMetadata/v1/instance/attributes/google-dockercfg 
E  while reading 'google-dockercfg-url' metadata: http status code: 404 while fetching url http://metadata.google.internal./computeMetadata/v1/instance/attributes/google-dockercfg-url 
E  Handler for GET /v1.27/images/gcr.io/cloud-dataflow/v1beta3/python-fnapi:2.11.0/json returned error: No such image: gcr.io/cloud-dataflow/v1beta3/python-fnapi:2.11.0 
E  EXT4-fs (sdb): VFS: Can't find ext4 filesystem 
E  Missing required coder_id on grpc_port for -16; using deprecated fallback. 
E  Missing required coder_id on grpc_port for -12; using deprecated fallback. 

Это мой код:

import datetime
import logging
import json
import argparse
from datetime import datetime

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.io.filesystems import FileSystems

from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions

# Configuration
# I set all the constant before creating the Pipeline Options
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
#options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'main_project'




def run(argv=None):
    """
    Define and run the pipeline.
    """
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
      '--output',
         dest='output',
        required=True,
        help='GCS destination folder to save the images to (example: gs://BUCKET_NAME/path')
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
      '--input_topic',
      help=('Input PubSub topic of the form '
            '"projects/<project name>/topics/<topic name>".'))
    known_args, pipeline_args = parser.parse_known_args(argv)


    # DoFn
    class ExtractHashtagsDoFn(beam.DoFn):
        def process(self, element):
            """
            Takes a text as input, search and return its content 
            """
            result = {'text': element}
            logging.info('{}'.format(result))
            yield result


    class SentimentAnalysisDoFn(beam.DoFn):
        def process(self, element):
            from google.cloud import language
            #from google.gax.errors import RetryError

            client = language.LanguageServiceClient()
            document = language.types.Document(
                content=element['text'].encode('utf-8'),
                type='PLAIN_TEXT')


            response = client.analyze_sentiment(
                document=document,
                encoding_type='UTF8')
            sentiment = response.document_sentiment
            element['sentiment_score'] = sentiment.score
            element['sentiment_magnitude'] = sentiment.magnitude


    #         logging.info('{}'.format(element))
            yield element


    class WriteToGCS(beam.DoFn):
        def __init__(self, outdir):
            self.outdir = outdir
        def process(self, element):
            element = json.dumps(element).encode('utf-8')
            source_date=datetime.now().strftime("%Y%m%d-%H%M%S")
            writer = FileSystems.create(self.outdir+'output'+format(source_date) +'.txt','text/plain')
            writer.write(element)
            writer.close()


    # Define Composite Transforms
    class TextAnalysisTransform(beam.PTransform):
        def expand(self, pcoll):
            return(
                pcoll
                | 'Decode' >> beam.Map(lambda string: string.decode('utf8', 'ignore'))
                | 'ExtractHashtags' >> beam.ParDo(ExtractHashtagsDoFn())
                | 'SentimentAnalysis' >> beam.ParDo(SentimentAnalysisDoFn())
                |  'Save file' >> beam.ParDo(WriteToGCS(known_args.output))
            )



    class WindowingForOutputTransform(beam.PTransform):
        def expand(self, pcoll):
            import json
            return(
                pcoll
                | 'Pack' >> beam.Map(lambda x: (x, 1))
                | 'Windowing' >> beam.WindowInto(window.FixedWindows(5, 0))
                | 'GroupByKey' >> beam.GroupByKey()
                | 'Unpack' >> beam.Map(lambda x: x[0])
            )


    class SaveToBigQueryTransform(beam.PTransform):
        def expand(self, pcoll):
            # Define the Schema.
            from apache_beam.io.gcp.internal.clients import bigquery

            table_schema = bigquery.TableSchema()

            # Fields that use standard types.
            alpha_schema = bigquery.TableFieldSchema()
            alpha_schema.name = 'text'
            alpha_schema.type = 'string'
            alpha_schema.mode = 'nullable'
            table_schema.fields.append(alpha_schema)

            beta_schema = bigquery.TableFieldSchema()
            beta_schema.name = 'sentiment_score'
            beta_schema.type = 'float'
            beta_schema.mode = 'nullable'
            table_schema.fields.append(beta_schema)

            gamma_schema = bigquery.TableFieldSchema()
            gamma_schema.name = 'sentiment_magnitude'
            gamma_schema.type = 'float'
            gamma_schema.mode = 'nullable'
            table_schema.fields.append(gamma_schema)


            # Saving the output to BigQuery.
            return (
                pcoll
                | 'PrepareForOutput' >> WindowingForOutputTransform()
                | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
                    table='test',
                    dataset='testimport',
                    project='main_project',
                    schema=table_schema,  # Pass the defined table_schema
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

    with beam.Pipeline(options=options) as p:
        plumbing = (
            p
            | 'LoadTestData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
            | 'decode' >> beam.Map(lambda x: json.loads(x.decode('utf-8')))
            | 'read_file' >> beam.FlatMap(lambda metadata: FileSystems.open('gs://%s/%s' % (metadata['bucket'], metadata['name'])))
            | 'TextAnalysis' >> TextAnalysisTransform()
            | 'StreamToBigQuery' >> SaveToBigQueryTransform()
            )
    p.run().wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
...