Поток данных GCP - пропускная способность постепенно снижается, рабочие не используются - PullRequest
1 голос
/ 12 июля 2020

У меня есть сценарий Beam, работающий в GCP Dataflow. Этот поток данных выполняет следующие шаги:

  1. Чтение ряда файлов, зашифрованных PGP. (Общий размер более 100 ГБ, отдельные файлы имеют размер 2 ГБ)
  2. Расшифруйте файлы, чтобы сформировать коллекцию PCollection
  3. Выполните wait () на PCollection
  4. Сделайте некоторая обработка каждой записи в коллекции PCollection перед записью в выходной файл

Поведение, наблюдаемое с потоком данных GCP:

  1. При чтении входных файлов и дешифровании файлов запускается с одним рабочим, а затем масштабируется до 30 рабочих. Но только один воркер продолжает использоваться, загрузка всех остальных воркеров составляет менее 10%
  2. Первоначально при расшифровке пропускная способность составляла 150 тыс. Записей в секунду. Итак, 90% дешифровки выполняется за 1 час, и это хорошо. Но затем пропускная способность постепенно снижается, даже до 100 записей в секунду. Итак, для выполнения оставшихся 10% рабочей нагрузки требуется еще 1-2 часа.

Есть идеи, почему рабочие не используются? Если нет использования, почему они не уменьшаются? Здесь я без надобности плачу за большое количество виртуальных машин :-(. Во-вторых, почему пропускная способность замедляет сокращение к концу и тем самым значительно увеличивает время завершения?

Ответы [ 2 ]

1 голос
/ 13 июля 2020

Существует проблема, связанная с пропускной способностью и поведением ввода Cloud Dataflow. Я предлагаю вам отслеживать улучшения, вносимые в автомасштабирование и поведение рабочих процессов здесь .

Архитектура по умолчанию для обработки и автомасштабирования потока данных не так отзывчива в некоторых случаях по сравнению с тем, когда включена функция Dataflow Streaming Engine . Я бы порекомендовал вам попробовать запустить соответствующий конвейер потока данных с включенным Streaming Engine, поскольку он обеспечивает более высокую производительность автомасштабирования в зависимости от загрузки ЦП для вашего конвейера.

Надеюсь, вы найдете приведенную выше информацию полезной.

0 голосов
/ 13 июля 2020

Можете ли вы попробовать реализовать свое решение без ожидания ()?

Например, FileIO.match (). Filepattern () -> ParDo (DoFn для расшифровки файлов) -> fileIO.readmatches () - > ParDo (DoFn для чтения файлов)

См. Пример здесь .

Это должно позволить вашему конвейеру лучше распараллеливаться.

...