как написать в GCS с ParDo и DoFn в Apache Beam - PullRequest
0 голосов
/ 16 марта 2019

Использование apache_beam.io.filesystems.FileSystems как писать в GCS с ParDo и DoFn ?? Я уже получаю вывод в формате csv от pardo. Нужно ли мне писать другое pardo для записи в gcs или я могу напрямую импортировать модуль для записи его непосредственно в gcs? пожалуйста помогите

1 Ответ

1 голос
/ 16 марта 2019

У меня есть пример здесь , где я записываю изображения в кодировке b64 в GCS, используя apache_beam.io.filesystems.FileSystems.Последний шаг конвейера принимает b64 в качестве PCollection, содержащего два поля key_id и image, и применяет ParDo:

b64 | 'Save images' >> beam.ParDo(WriteToSeparateFiles(known_args.output))

, где known_args.output - базовый путь GCS и WriteToSeparateFilesвыглядит следующим образом:

class WriteToSeparateFiles(beam.DoFn):
    def __init__(self, outdir):
        self.outdir = outdir
    def process(self, element):
        writer = filesystems.FileSystems.create(self.outdir + element['key_id'] + '.png')
        writer.write(element['image'])
        writer.close()

С filesystems.FileSystems.create() я могу контролировать путь назначения.Для базового пути я использую параметр, который мы передали функции, и использую key_id каждого элемента для генерации значимых имен файлов.Наконец, я добавляю расширение .png при записи изображений.

Я использую writer.write(element['image']) для сохранения содержимого поля image для каждого файла и закрываю поток с помощью writer.close().

...