Beam GCSFileSystem / GcsBufferedWriter производительность - PullRequest
0 голосов
/ 30 августа 2018

Я пытаюсь загрузить большие файлы из http и загрузить их в gcs, используя apache-beam python sdk (2.6.0) в потоке данных.

            from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
            gcs = GCSFileSystem(options)
            logging.info("Writing to file {}".format(filename))
            f = gcs.create(filename, compression_type=CompressionTypes.UNCOMPRESSED)
            chunk_size = 512 * 1024
            for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
                if chunk:
                    f.write(chunk)
                else:
                    break
                if i % 10 == 0:
                    logging.info("Written {by} kb bytes into {filename}".format(
                        by=((i * chunk_size) / 1000), filename=filename))
            logging.info("Closing file {}".format(filename))
            f.close()
            logging.info("Closed file {}".format(filename))

Этот подход хорошо работает для небольших файлов (~ kb), хотя я изо всех сил стараюсь заставить его работать для файлов большего размера (~ Gb).

Ведение журнала указывает, что оно застревает в f.close (), и в GCS еще не было записано ни одного файла. Я покопался в коде, и кажется, что GCSFileSystem создает экземпляр GcsBufferedWriter, который сам записывает в многопроцессорный канал, который попадает в команду Transfer.Upload.

У меня нет особых намеков на то, что может вызвать эту проблему, я подозреваю, что соединения / каналы сбрасываются или отключаются в процессе (сервер http, с которого я загружаю, имеет очень низкую пропускную способность, и я распараллеливаюсь звонки с помощью gevent), или просто перевод. Загрузите с некоторыми проблемами производительности.

Когда я проверяю статистику машины, у меня намного больше входящего (20Mo / s), чем выходного трафика (200ko / s), в основном нет записи на диск, что заставляет меня задаться вопросом, куда отправляются все данные по конвейеру ..

Спасибо!

1 Ответ

0 голосов
/ 10 сентября 2018

Тем временем я перешел на использование Python-клиента Google Storage, который действительно, кажется, имеет лучшую производительность и работает, как ожидалось. Хотя я наблюдаю некоторые временные ошибки

File "dataflow/make_training_chips.py", line 93, in process
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/client.py", line 71, in __init__
_http=_http)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 215, in __init__
_ClientProjectMixin.__init__(self, project=project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 169, in __init__
project = self._determine_default(project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 182, in _determine_default
return _determine_default_project(project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/_helpers.py", line 179, in _determine_default_project
_, project = google.auth.default()
File "/usr/local/lib/python2.7/dist-packages/google/auth/_default.py", line 306, in default
  raise exceptions.DefaultCredentialsError(_HELP_MESSAGE)
  DefaultCredentialsError: Could not automatically determine credentials. 
  Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials 
  and re-run the application. For more information, please see 
  https://developers.google.com/accounts/docs/application-default-credentials. 
  [while running 'ParDo(GenerateTraingChips)']

, что часто случается с моими работниками (хотя я повторяю задание с экспоненциальным откатом ...).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...