Файл перезаписывается после закрытия при запуске конвейера луча в DataFlow - PullRequest
0 голосов
/ 01 марта 2019

Я создал лучевой конвейер p для запуска в потоке данных и хочу записать что-то в файл перед запуском моего конвейера.Мой код:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import time

pipeline_options = PipelineOptions(runner='DirectRunner')
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)

myString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

myFile3984573498534 = open('myfile2398457erity348t67349856734986739846.txt','w+')
myFile3984573498534.write(myString*100)
myFile3984573498534.close()

time.sleep(1)

r = p.run()

Файл записывается правильно, но затем он перезаписывается и становится пустым, как только вызывается p.run().Кто-нибудь может объяснить, почему это происходит?

ПРИМЕЧАНИЯ:

  • Изменение имени файла и имени переменной файла не влияет на результат.
  • Я вставилtime.sleep(1), чтобы можно было просмотреть файл для записи до вызова p.run() и файл будет перезаписан на пустое.Это не обязательно и может быть изменено / удалено.

1 Ответ

0 голосов
/ 18 марта 2019

Проблема возникает из-за строки pipeline_options.view_as(SetupOptions).save_main_session = True.

Когда конвейер запускается, луч будет использовать dill.dump_session для сериализации основного сеанса и сохранения его в файл.Затем он будет использовать dill.load_session для загрузки этого же файла и десериализации его для воссоздания основного сеанса.Он снова выполнит повторную сериализацию основного сеанса, используя dill.dump_session для отправки бегуну.Причиной сериализации, десериализации и последующей повторной сериализации основного сеанса является исправление несоответствия в сериализации, как указано в https://github.com/uqfoundation/dill/issues/195. Это означает, что у всех участников будет эта проблема.

Основной сеансв этом случае содержит объект myFile3984573498534.Когда это десериализовано, он снова откроет файл так же, как вы открыли его изначально, используя режим w+.Это немедленно перезапишет файл.Затем этот файл закрывается, и конвейер заканчивается пустым файлом.

Лучшее решение для этого - открыть файл в режиме r+, чтобы файл открывался в режиме чтения во время десериализации основного файла.сеанс, в результате чего он не изменяется.

Если вам нужно , чтобы открыть файл в режиме w+, вы должны удалить переменную, хранящую файл после закрытия файла, то есть del(myFile3984573498534) после myFile3984573498534.close() , но до запуска конвейера.Это предотвращает сериализацию переменной, поскольку она больше не существует, в результате чего файл не изменяется.

...