Зак, у меня есть похожий вариант использования с 'n' разным количеством файлов для ежедневной обработки. Я собираюсь предположить, что вы используете приведенный выше код как есть и пытаетесь улучшить производительность всей работы. Вот несколько моих наблюдений:
Не уверен, что такое число coalesce(numPartitions)
на самом деле и почему оно используется до процесса дедупликации. Ваш иск-отправка показывает, что вы создаете 1600 разделов, и этого достаточно для начала.
Если вы собираетесь переразбить перед записью, то объединение, описанное выше, может вообще не быть полезным, так как -partition перетасует данные.
Поскольку вы заявляете, что записываете 10-20 паркетных файлов, это означает, что вы используете только 10-20 ядер для записи в последней части вашей работы, которая является основной. причина его медленная Исходя из оценки в 100 ГБ, размер файла партера составляет от 5 ГБ до 10 ГБ, что действительно огромно, и я сомневаюсь, что кто-то сможет открыть их на своем локальном ноутбуке или компьютере EC2, если они не используют EMR или подобное (с огромной памятью исполнителя при чтении весь файл или разлив на диск), поскольку требования к памяти будут слишком высокими. Я рекомендую создавать файлы паркета размером около 1 ГБ, чтобы избежать любой из этих проблем.
Кроме того, если вы создаете файл паркета объемом 1 ГБ, вы, скорее всего, ускорите процесс от 5 до 10 раз, как и вы. использовать больше исполнителей / ядер, чтобы писать их параллельно. На самом деле вы можете запустить эксперимент, просто написав фрейм данных с разделами по умолчанию.
Это подводит меня к тому, что вам действительно не нужно использовать переразметку, как вы хотите write.partitionBy ("partition_date") ) вызов. Ваш repartition()
вызов фактически заставляет информационный фрейм иметь максимум 30-31 разделов в зависимости от количества дней в этом месяце, которое определяет количество записываемых файлов. write.partitionBy("partition_date")
фактически записывает данные в раздел S3, и если ваш фрейм данных имеет, скажем, 90 разделов, он будет писать в 3 раза быстрее (3 * 30). df.repartition()
заставляет его замедлять. Вам действительно нужно иметь файлы размером 5 ГБ или больше?
Еще один важный момент - ленивая оценка Spark иногда слишком умна. В вашем случае он, скорее всего, будет использовать только количество исполнителей для всей программы на основе repartition(number)
. Вместо этого вы должны попробовать, df.cache() -> df.count() and then df.write()
. То, что это делает, - то, что это заставляет искру использовать все доступные ядра исполнителя. Я предполагаю, что вы читаете файлы параллельно. В вашей текущей реализации вы, вероятно, используете 20-30 ядер. Одно предостережение, поскольку вы используете машины r4 / r5, не стесняйтесь увеличить память вашего исполнителя до 48G с 8 ядрами. Я обнаружил, что 8 ядер быстрее для моей задачи вместо стандартных 5-ядерных рекомендаций.
Еще один указатель - попробовать ParallelG C вместо G1G C. Для случая использования, подобного этому, когда вы читаете 1000x файлов, я заметил, что он работает лучше или не хуже, чем G1G c. Пожалуйста, попробуйте.
В своей рабочей нагрузке я использую подход coalesce(n)
, где 'n' дает мне 1 ГБ паркетный файл. Я читаю файлы параллельно, используя ВСЕ ядра, доступные в кластере. Только во время записи мои ядра простаивают, но вы не можете этого избежать.
Я не уверен, как spark.sql.files.maxRecordsPerFile
работает в сочетании с coalesce() or repartition()
, но я обнаружил, что 1 ГБ кажется приемлемым с pandas, спектр красного смещения, Афина и др. c.
Надеюсь, это поможет. Чара