У меня есть пример здесь , где я записываю изображения в кодировке b64 в GCS, используя apache_beam.io.filesystems.FileSystems
.Последний шаг конвейера принимает b64
в качестве PCollection, содержащего два поля key_id
и image
, и применяет ParDo:
b64 | 'Save images' >> beam.ParDo(WriteToSeparateFiles(known_args.output))
, где known_args.output
- базовый путь GCS и WriteToSeparateFiles
выглядит следующим образом:
class WriteToSeparateFiles(beam.DoFn):
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
writer = filesystems.FileSystems.create(self.outdir + element['key_id'] + '.png')
writer.write(element['image'])
writer.close()
С filesystems.FileSystems.create()
я могу контролировать путь назначения.Для базового пути я использую параметр, который мы передали функции, и использую key_id
каждого элемента для генерации значимых имен файлов.Наконец, я добавляю расширение .png
при записи изображений.
Я использую writer.write(element['image'])
для сохранения содержимого поля image
для каждого файла и закрываю поток с помощью writer.close()
.