Избегайте пересчета размера всех файлов облачного хранилища в Beam Python SDK - PullRequest
1 голос
/ 26 марта 2020

Я работаю над конвейером, который читает ~ 5 миллионов файлов из каталога Google Cloud Storage (GCS). Он настроен для работы в облачном потоке данных Google.

Проблема заключается в том, что когда я запускаю конвейер, "часы" вычисляют размер всех файлов:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]

Как вы можете видеть, для расчета размера примерно 5,5 млн. Файлов потребовалось полтора часа (5549 секунд), а затем все началось заново! Потребовалось еще 2 часа, чтобы запустить второй проход, затем он начал его в третий раз! На момент написания этой статьи работа по-прежнему недоступна в консоли Dataflow, поэтому я полагаю, что все это происходит на моей локальной машине и не использует какие-либо распределенные вычисления.

Когда я тестирую конвейер с меньшим входным набором данных (2 файла) повторяет оценку размера 4 раза:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.

С такой скоростью потребуется около 8 часов, чтобы выполнить оценки размера GCS для всех файлов 5,5M 4 раза, все до того, как задание Dataflow было запущено.

Мой конвейер настроен с опцией --runner=DataflowRunner, поэтому он должен работать в потоке данных:

python bigquery_import.py --runner=DataflowRunner #other options...

Конвейер читает из GCS следующим образом :

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input',
    required=True,
    help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

with beam.Pipeline(options=pipeline_options) as p:
    files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')

Полный код приведен в bigquery_import.py на GitHub.

Я не понимаю, почему этот утомительный процесс происходит вне среды потока данных и почему это нужно сделать несколько раз. Я правильно читаю файлы из GCS или есть более эффективный способ?

1 Ответ

3 голосов
/ 27 марта 2020

Спасибо за сообщение об этом. Луч имеет два преобразования для чтения текста. ReadFromText и ReadAllFromText. ReadFromText столкнется с этой проблемой, но ReadAllFromText не должен.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438

Недостатком ReadAllFromText является то, что он не будет выполнять динамически c работает перебалансировка, но это не должно быть проблемой при чтении большого количества файлов.

Создано https://issues.apache.org/jira/browse/BEAM-9620 для отслеживания проблем с ReadFromText (и источниками на основе файлов в целом ).

...