У меня есть входной gcs, где я получаю файлы, и я преобразую и загружаю его в другой gcs. Он работает нормально, когда я использую прямой запуск, но когда я использую обработчик потока данных, он не запускается.
Вот как я запускаю мой бегунок потока данных:
python -m pipeline.pubsub2 --runner DataflowRunner --project my_project --temp_location gs://proj-temp/temp1/ --input_topic "projects/my_project/topics/testsub1" --output "gs://baker-sentimental-2/" --streaming --setup_file ./setup.py
Я получаю эти ошибки, пока поток данных работает:
E EXT4-fs (dm-0): couldn't mount as ext3 due to feature incompatibilities
E Error initializing dynamic plugin prober: Error (re-)creating driver directory: mkdir /usr/libexec/kubernetes: read-only file system
E Image garbage collection failed once. Stats initialization may not have completed yet: failed to get imageFs info: unable to find data for container /
E [ContainerManager]: Fail to get rootfs information unable to find data for container /
E Handler for GET /v1.27/images/gcr.io/cloud-dataflow/v1beta3/harness:2.11.0/json returned error: No such image: gcr.io/cloud-dataflow/v1beta3/harness:2.11.0
E while reading 'google-dockercfg' metadata: http status code: 404 while fetching url http://metadata.google.internal./computeMetadata/v1/instance/attributes/google-dockercfg
E while reading 'google-dockercfg-url' metadata: http status code: 404 while fetching url http://metadata.google.internal./computeMetadata/v1/instance/attributes/google-dockercfg-url
E Handler for GET /v1.27/images/gcr.io/cloud-dataflow/v1beta3/python-fnapi:2.11.0/json returned error: No such image: gcr.io/cloud-dataflow/v1beta3/python-fnapi:2.11.0
E EXT4-fs (sdb): VFS: Can't find ext4 filesystem
E Missing required coder_id on grpc_port for -16; using deprecated fallback.
E Missing required coder_id on grpc_port for -12; using deprecated fallback.
Это мой код:
import datetime
import logging
import json
import argparse
from datetime import datetime
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
# Configuration
# I set all the constant before creating the Pipeline Options
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
#options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'main_project'
def run(argv=None):
"""
Define and run the pipeline.
"""
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=True,
help='GCS destination folder to save the images to (example: gs://BUCKET_NAME/path')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<project name>/topics/<topic name>".'))
known_args, pipeline_args = parser.parse_known_args(argv)
# DoFn
class ExtractHashtagsDoFn(beam.DoFn):
def process(self, element):
"""
Takes a text as input, search and return its content
"""
result = {'text': element}
logging.info('{}'.format(result))
yield result
class SentimentAnalysisDoFn(beam.DoFn):
def process(self, element):
from google.cloud import language
#from google.gax.errors import RetryError
client = language.LanguageServiceClient()
document = language.types.Document(
content=element['text'].encode('utf-8'),
type='PLAIN_TEXT')
response = client.analyze_sentiment(
document=document,
encoding_type='UTF8')
sentiment = response.document_sentiment
element['sentiment_score'] = sentiment.score
element['sentiment_magnitude'] = sentiment.magnitude
# logging.info('{}'.format(element))
yield element
class WriteToGCS(beam.DoFn):
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
element = json.dumps(element).encode('utf-8')
source_date=datetime.now().strftime("%Y%m%d-%H%M%S")
writer = FileSystems.create(self.outdir+'output'+format(source_date) +'.txt','text/plain')
writer.write(element)
writer.close()
# Define Composite Transforms
class TextAnalysisTransform(beam.PTransform):
def expand(self, pcoll):
return(
pcoll
| 'Decode' >> beam.Map(lambda string: string.decode('utf8', 'ignore'))
| 'ExtractHashtags' >> beam.ParDo(ExtractHashtagsDoFn())
| 'SentimentAnalysis' >> beam.ParDo(SentimentAnalysisDoFn())
| 'Save file' >> beam.ParDo(WriteToGCS(known_args.output))
)
class WindowingForOutputTransform(beam.PTransform):
def expand(self, pcoll):
import json
return(
pcoll
| 'Pack' >> beam.Map(lambda x: (x, 1))
| 'Windowing' >> beam.WindowInto(window.FixedWindows(5, 0))
| 'GroupByKey' >> beam.GroupByKey()
| 'Unpack' >> beam.Map(lambda x: x[0])
)
class SaveToBigQueryTransform(beam.PTransform):
def expand(self, pcoll):
# Define the Schema.
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
# Fields that use standard types.
alpha_schema = bigquery.TableFieldSchema()
alpha_schema.name = 'text'
alpha_schema.type = 'string'
alpha_schema.mode = 'nullable'
table_schema.fields.append(alpha_schema)
beta_schema = bigquery.TableFieldSchema()
beta_schema.name = 'sentiment_score'
beta_schema.type = 'float'
beta_schema.mode = 'nullable'
table_schema.fields.append(beta_schema)
gamma_schema = bigquery.TableFieldSchema()
gamma_schema.name = 'sentiment_magnitude'
gamma_schema.type = 'float'
gamma_schema.mode = 'nullable'
table_schema.fields.append(gamma_schema)
# Saving the output to BigQuery.
return (
pcoll
| 'PrepareForOutput' >> WindowingForOutputTransform()
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='test',
dataset='testimport',
project='main_project',
schema=table_schema, # Pass the defined table_schema
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
with beam.Pipeline(options=options) as p:
plumbing = (
p
| 'LoadTestData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
| 'decode' >> beam.Map(lambda x: json.loads(x.decode('utf-8')))
| 'read_file' >> beam.FlatMap(lambda metadata: FileSystems.open('gs://%s/%s' % (metadata['bucket'], metadata['name'])))
| 'TextAnalysis' >> TextAnalysisTransform()
| 'StreamToBigQuery' >> SaveToBigQueryTransform()
)
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()