Я работаю над конвейером, который читает ~ 5 миллионов файлов из каталога Google Cloud Storage (GCS). Он настроен для работы в облачном потоке данных Google.
Проблема заключается в том, что когда я запускаю конвейер, "часы" вычисляют размер всех файлов:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
Как вы можете видеть, для расчета размера примерно 5,5 млн. Файлов потребовалось полтора часа (5549 секунд), а затем все началось заново! Потребовалось еще 2 часа, чтобы запустить второй проход, затем он начал его в третий раз! На момент написания этой статьи работа по-прежнему недоступна в консоли Dataflow, поэтому я полагаю, что все это происходит на моей локальной машине и не использует какие-либо распределенные вычисления.
Когда я тестирую конвейер с меньшим входным набором данных (2 файла) повторяет оценку размера 4 раза:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.
С такой скоростью потребуется около 8 часов, чтобы выполнить оценки размера GCS для всех файлов 5,5M 4 раза, все до того, как задание Dataflow было запущено.
Мой конвейер настроен с опцией --runner=DataflowRunner
, поэтому он должен работать в потоке данных:
python bigquery_import.py --runner=DataflowRunner #other options...
Конвейер читает из GCS следующим образом :
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')
Полный код приведен в bigquery_import.py на GitHub.
Я не понимаю, почему этот утомительный процесс происходит вне среды потока данных и почему это нужно сделать несколько раз. Я правильно читаю файлы из GCS или есть более эффективный способ?