Перезапись в S3 Bucket из EMR занимает много времени - PullRequest
2 голосов
/ 03 августа 2020

Я использую EMR (5.30) с искрой и выбрал oop в качестве выбранного приложения. Условия следующие:

  • Исторические данные находятся в расположении S3 (700 ГБ)
  • Ежедневные инкрементные данные (500 МБ)
  • Мне нужно создать слияние структура, основанная на некоторых условиях
  • Запишите результат обратно в S3 и завершите кластер

Что я делаю -

  • раскрутка кластера с 1 m5.8xlarge (мастер) и 5 ​​m5.8xlarge (ядро) из лямбда со всеми конфигурациями
  • конфигураций - 29 экземпляров исполнителей, 5 ядер исполнителей, 18 ГБ памяти / исполнитель. 3 ГБ служебной памяти, параллелизм по умолчанию 290
  • в скрипте PySpark:
    • копирование исторических данных после некоторых операций, таких как приведение типа данных, а затем во временное местоположение (S3) - перезапись
    • добавление инкрементных данных в временное местоположение (такое же временное местоположение) - добавление
    • выполнение всех необходимых преобразований
    • возвращение результата в другое местоположение S3 - перезапись (в том же ведре, но в другой папке) откуда на следующий день тот же цикл будет продолжен

Наблюдения:

  • Этот окончательный процесс перезаписи занимает более часа, но первый занимает только 14-15 мин.
  • Я пробовал использовать повторное разделение и объединение, никаких улучшений
  • При попытке с s3n.multipart.uploads.enabled установлено значение false, данная ошибка включает его
  • Согласно документации Amazon https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html, у него есть ограничение только на 1000 частей в списке. Таким образом, размер файла составляет в среднем 600-700 МБ, а общий выходной размер составляет около 700 ГБ. Итак, всякий раз, когда я пытался переопределить значение no. с переразбивкой этого фактически не могло произойти
  • , если я использую partionBy logi c в коде при перезаписи, это не имеет смысла, так как это занимает больше времени (более 2 часов). Также на следующий день мне нужно перезаписать результат.
  • еще одно наблюдение, при перезаписи, он сначала удалит содержимое папки, затем удалит папку и воссоздает ее, чтобы поместить набор результатов из EMR. В этом случае он не прошел весь процесс между ними.

Мой вопрос:

  1. Почему оба процесса перезаписи занимают разные временные интервалы, когда код одинаков df.write.mode('overwrite').parquet(target_location)
  2. Как повысить производительность последней перезаписи

1 Ответ

1 голос
/ 04 августа 2020

IIU C, в первом сценарии вы обрабатываете данные размером 700 ГБ, а во втором сценарии вы выполняете некоторое соединение между данными объемом 500 МБ и данными 700 ГБ, чтобы включить upserts и сохранить их обратно в s3.

Если это так, то проблема не в вашем операторе записи, а в преобразованиях, которые вы выполняете во втором случае, поскольку вы должны объединять инкрементные данные с полными данными для идентификации вставок и обновлений, затем изменив результаты соответствующим образом.

Если это будет продолжаться, ваши данные будут продолжать расти, и у вас будет не хватать времени и памяти для обработки ваших данных.

Если вы создаете инкрементную базу данных над s3, вы следует рассмотреть возможность использования HUDI, который упростит вашу работу, а также правильно использует ваш кластер для обработки только upserts.

Вот ссылка для реализации HUDI.

https://github.com/apache/hudi

...