Я пытаюсь загрузить большие файлы из http и загрузить их в gcs, используя apache-beam python sdk (2.6.0) в потоке данных.
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
gcs = GCSFileSystem(options)
logging.info("Writing to file {}".format(filename))
f = gcs.create(filename, compression_type=CompressionTypes.UNCOMPRESSED)
chunk_size = 512 * 1024
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
else:
break
if i % 10 == 0:
logging.info("Written {by} kb bytes into {filename}".format(
by=((i * chunk_size) / 1000), filename=filename))
logging.info("Closing file {}".format(filename))
f.close()
logging.info("Closed file {}".format(filename))
Этот подход хорошо работает для небольших файлов (~ kb), хотя я изо всех сил стараюсь заставить его работать для файлов большего размера (~ Gb).
Ведение журнала указывает, что оно застревает в f.close (), и в GCS еще не было записано ни одного файла.
Я покопался в коде, и кажется, что GCSFileSystem создает экземпляр GcsBufferedWriter, который сам записывает в многопроцессорный канал, который попадает в команду Transfer.Upload.
У меня нет особых намеков на то, что может вызвать эту проблему, я подозреваю, что соединения / каналы сбрасываются или отключаются в процессе (сервер http, с которого я загружаю, имеет очень низкую пропускную способность, и я распараллеливаюсь звонки с помощью gevent), или просто перевод. Загрузите с некоторыми проблемами производительности.
Когда я проверяю статистику машины, у меня намного больше входящего (20Mo / s), чем выходного трафика (200ko / s), в основном нет записи на диск, что заставляет меня задаться вопросом, куда отправляются все данные по конвейеру ..
Спасибо!