Я пытаюсь прочитать метаданные из pubsub, и я могу успешно прочитать сообщение, и после этого я пытаюсь передать имя корзины и имя файла, чтобы я мог открыть файл gcs для выполнения какой-либо операции, но я не могу сделатьИтак.
from __future__ import absolute_import
import argparse
import logging
from past.builtins import unicode
import json
#from google.cloud import language
#from google.cloud.language import enums
#from google.cloud.language import types
import apache_beam as beam
import apache_beam.transforms.window as window
import re
from builtins import object
from past.builtins import unicode
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileSystem
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.textio import ReadFromText, WriteToText
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
'--output_topic', required=True,
help=('Output PubSub topic of the form '
group = parser.add_mutually_exclusive_group(required=True)
help=('Input PubSub topic of the form '
help=('Input PubSub subscription of the form '
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
# Read from PubSub into a PCollection.
if known_args.input_subscription:
messages = (p
| beam.io.ReadFromPubSub(
messages = (p
| beam.io.ReadFromPubSub(topic=known_args.input_topic)
def print_row(row):
file_metadata_pcoll = (messages | 'decode' >> beam.Map(lambda x:
lines = (file_metadata_pcoll | 'read_file' >> beam.FlatMap(lambda
metadata: beam.io.filesystems.Filesystems.open('gs://%s/%s' %
(metadata['bucket'], metadata['name'])))
| "print" >> beam.Map(print_row))
result = p.run()
if __name__ == '__main__':
Я получаю эту ошибку
ОШИБКА: root: Исключение в комплекте
<apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f96cd9d5d08>, due to an exception.
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 727, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 419, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/home/g9192gks/baker-template/pipeline/pubsub.py", line 79, in <lambda>
lines = (file_metadata_pcoll | 'read_file' >> beam.FlatMap(lambda metadata: beam.io.filesystems.Filesystems.open('gs://%s/%s' % (metadata['bucket'], metadata['name'])))
AttributeError: module 'apache_beam.io.filesystems' has no attribute 'Filesystems'
Я попытался импортировать модуль несколькими способами:
from apache_beam.io.filesystem import FileSystem also
from apache_beam.io.filesystem import FileSystems also
from apache_beam.io.filesystems import FileSystems
#and i also changed
lines = (file_metadata_pcoll | 'read_file' >> beam.FlatMap(lambda metadata: beam.io.filesystems.Filesystems.open('gs://%s/%s' % (metadata['bucket'], metadata['name'])))
lines = (file_metadata_pcoll | 'read_file' >> beam.FlatMap(lambda metadata: beam.io.filesystem.Filesystem.open('gs://%s/%s' % (metadata['bucket'], metadata['name'])))
но не повезло