Как ускорить массовый импорт в облачное хранилище данных Google с несколькими работниками? - PullRequest
0 голосов
/ 07 мая 2018

У меня есть задание потока данных на основе Apache-Beam для чтения с использованием vcf source из одного текстового файла (хранящегося в облачном хранилище Google), преобразования текстовых строк в хранилище данных Entities и записи их в хранилище данных . Рабочий процесс работает нормально, но я заметил следующие минусы:

  • Скорость записи в хранилище данных составляет не более 25-30 объектов в секунду.
  • Я пытался использовать --autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100, но, похоже, для выполнения предпочтение отдается одному работнику (см. График ниже: целевых работников однажды увеличили до 2, но сократили до 1 "из-за возможности распараллелить работу на текущем шаге") .

image

Я не использовал путь предка для ключей; все сущности одинаковы kind.

Код конвейера выглядит следующим образом:

def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
  with beam.Pipeline(options=pipeline_options) as p:
  (p
   | 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
   | 'Create my entity' >> beam.ParDo(
     ToEntityFn(), user_options.kind)
   | 'Write to datastore' >> WriteToDatastore(project))

Поскольку у меня есть миллионы строк для записи в хранилище данных, это заняло бы слишком много времени для записи со скоростью 30 объектов / сек.

Вопрос: вход представляет собой один огромный сжатый файл. Нужно ли разбивать его на несколько небольших файлов для запуска нескольких рабочих? Есть ли другой способ сделать импорт быстрее? Я что-то пропускаю в настройке num_workers? Спасибо!

Ответы [ 2 ]

0 голосов
/ 08 мая 2018

Я посмотрел на дизайн vcfio . Я подозреваю (если я правильно понимаю), что причина, по которой я всегда получаю одного работника, когда ввод является одним файлом, связана с ограничением ограничения _VcfSource и VCF, Этот формат имеет часть заголовка, которая определяет, как переводить строки без заголовка. Это приводит к тому, что каждый работник, который читает исходный файл, должен работать со всем файлом. Когда я разделяю один файл на 5 отдельных файлов, имеющих один и тот же заголовок, я успешно получаю до 5 рабочих (но, скорее всего, по той же причине).

image

Одна вещь, которую я не понимаю, состоит в том, что число читающих работников может быть ограничено до 5 (в данном случае). Но почему у нас есть только 5 рабочих, чтобы написать? В любом случае, я думаю, что нашел альтернативный способ запуска нескольких рабочих с помощью луча Dataflow-Runner (используйте предварительно разделенные файлы VCF ). Существует также родственный подход в варианте преобразования gcp , в котором vcfio был значительно расширен. Кажется, для поддержки нескольких рабочих с одним входным файлом vcf. Я хотел бы, чтобы изменения в этом проекте могли быть объединены и в проекте луча.

0 голосов
/ 07 мая 2018

Я не знаком с Apache Beam, ответ с точки зрения общего потока.

Если предположить, что между данными сущностей в различных разделах входного файла нет никаких связей, то да, работа с несколькими входными файлами определенно должна помочь, поскольку все эти файлы могут обрабатываться практически параллельно (в зависимости, конечно, от максимального количество доступных работников).

Вам может не нужно заранее разбивать огромный zip-файл, возможно, можно будет просто передать сегменты единого потока входных данных, чтобы разделить рабочих сегментов данных для записи, если издержки самой такой передачи будут является незначительным по сравнению с фактической обработкой сегмента данных.

Общим ограничением производительности будет скорость чтения входных данных, разбиение их на сегменты и передача работникам данных сегментов.

Работник сегмента данных будет дополнительно разбивать полученный сегмент данных на более мелкие порции, эквивалентные максимум 500 объектам, которые можно преобразовать в объекты и записать в хранилище данных за одну пакетную операцию. В зависимости от используемой клиентской библиотеки хранилища данных может быть возможно выполнить эту операцию асинхронно, разрешая разделение на куски и преобразование в сущности, не дожидаясь завершения предыдущих записей хранилища данных.

Тогда ограничение производительности у работника сегмента данных будет равно скорости, с которой сегмент данных может быть разбит на куски, а порция преобразована в сущности

Если асинхронные операции недоступны или для еще более высокой пропускной способности, может быть выполнена еще одна передача каждого чанка работнику сегмента, при этом работник сегмента выполняет преобразование в сущности и пакетную запись хранилища данных.

Тогда ограничение производительности на рабочем уровне сегмента данных будет просто скоростью, с которой сегмент данных может быть разбит на куски и передан работникам чанка.

При таком подходе фактическое преобразование в сущности и пакетная запись их в хранилище данных (асинхронное или нет) больше не будут находиться на критическом пути разделения потока входных данных, который, я полагаю, ограничивает производительность в вашем текущем подход.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...