Как оптимизировать Spark для записи больших объемов данных в S3 - PullRequest
1 голос
/ 07 января 2020

Я делаю изрядное количество ETL, используя Apache Spark на EMR.

Я вполне доволен большинством настроек, необходимых для получения хорошей производительности, но у меня есть одна работа, которую я не могу Похоже, что это понятно.

По сути, я беру около 1 ТБ паркетных данных - распределенных по десяткам тысяч файлов в S3 - и добавляю несколько столбцов и выписываю их, разделенные одним из атрибутов даты данных - снова, паркет отформатирован в S3.

Я запускаю так:

spark-submit --conf spark.dynamicAllocation.enabled=true  --num-executors 1149 --conf spark.driver.memoryOverhead=5120 --conf  spark.executor.memoryOverhead=5120 --conf  spark.driver.maxResultSize=2g --conf spark.sql.shuffle.partitions=1600 --conf spark.default.parallelism=1600 --executor-memory 19G --driver-memory 19G --executor-cores 3 --driver-cores 3 --class com.my.class path.to.jar <program args>

Размер кластера определяется динамически в зависимости от размера набора входных данных, и аргументы num-executors, spark. sql .shuffle.partitions и spark.default.parallelism рассчитываются на основе размера кластера.

Код примерно так делает:

va df = (read from s3 and add a few columns like timestamp and source file name)

val dfPartitioned = df.coalesce(numPartitions)

val sqlDFProdDedup = spark.sql(s""" (query to dedup against prod data """);

sqlDFProdDedup.repartition($"partition_column")
  .write.partitionBy("partition_column")
  .mode(SaveMode.Append).parquet(outputPath)

Когда я смотрю на диаграмму ганглиев, я получаю огромный всплеск ресурсов, в то время как логика дедупликации c работает и некоторые данные перемешиваются, но тогда фактическая запись данных использует только крошечную долю ресурсов и в течение нескольких часов.

Я не думаю, что основной проблемой является перекос разделов, потому что данные должны быть справедливо распределены по всем разделам.

Столбец разделов - это, по сути, день месяца, поэтому каждое задание обычно имеет только 5-20 разделов, в зависимости от диапазона набора входных данных. Каждый раздел обычно содержит около 100 ГБ данных в 10-20 файлах паркета.

Я устанавливаю режим spark. sql .files.maxRecordsPerFile для управления размером этих выходных файлов.

Итак, мой главный вопрос: как я могу улучшить производительность здесь?

Простое добавление ресурсов, кажется, не очень помогает.

Я пытался увеличить исполнителей (чтобы уменьшить перетасовку) ), а также увеличить количество процессоров на одного исполнителя, но это не имеет значения.

Заранее спасибо!

Ответы [ 2 ]

0 голосов
/ 16 апреля 2020

Зак, у меня есть похожий вариант использования с 'n' разным количеством файлов для ежедневной обработки. Я собираюсь предположить, что вы используете приведенный выше код как есть и пытаетесь улучшить производительность всей работы. Вот несколько моих наблюдений:

  1. Не уверен, что такое число coalesce(numPartitions) на самом деле и почему оно используется до процесса дедупликации. Ваш иск-отправка показывает, что вы создаете 1600 разделов, и этого достаточно для начала.

  2. Если вы собираетесь переразбить перед записью, то объединение, описанное выше, может вообще не быть полезным, так как -partition перетасует данные.

  3. Поскольку вы заявляете, что записываете 10-20 паркетных файлов, это означает, что вы используете только 10-20 ядер для записи в последней части вашей работы, которая является основной. причина его медленная Исходя из оценки в 100 ГБ, размер файла партера составляет от 5 ГБ до 10 ГБ, что действительно огромно, и я сомневаюсь, что кто-то сможет открыть их на своем локальном ноутбуке или компьютере EC2, если они не используют EMR или подобное (с огромной памятью исполнителя при чтении весь файл или разлив на диск), поскольку требования к памяти будут слишком высокими. Я рекомендую создавать файлы паркета размером около 1 ГБ, чтобы избежать любой из этих проблем.

Кроме того, если вы создаете файл паркета объемом 1 ГБ, вы, скорее всего, ускорите процесс от 5 до 10 раз, как и вы. использовать больше исполнителей / ядер, чтобы писать их параллельно. На самом деле вы можете запустить эксперимент, просто написав фрейм данных с разделами по умолчанию.

Это подводит меня к тому, что вам действительно не нужно использовать переразметку, как вы хотите write.partitionBy ("partition_date") ) вызов. Ваш repartition() вызов фактически заставляет информационный фрейм иметь максимум 30-31 разделов в зависимости от количества дней в этом месяце, которое определяет количество записываемых файлов. write.partitionBy("partition_date") фактически записывает данные в раздел S3, и если ваш фрейм данных имеет, скажем, 90 разделов, он будет писать в 3 раза быстрее (3 * 30). df.repartition() заставляет его замедлять. Вам действительно нужно иметь файлы размером 5 ГБ или больше?

Еще один важный момент - ленивая оценка Spark иногда слишком умна. В вашем случае он, скорее всего, будет использовать только количество исполнителей для всей программы на основе repartition(number). Вместо этого вы должны попробовать, df.cache() -> df.count() and then df.write(). То, что это делает, - то, что это заставляет искру использовать все доступные ядра исполнителя. Я предполагаю, что вы читаете файлы параллельно. В вашей текущей реализации вы, вероятно, используете 20-30 ядер. Одно предостережение, поскольку вы используете машины r4 / r5, не стесняйтесь увеличить память вашего исполнителя до 48G с 8 ядрами. Я обнаружил, что 8 ядер быстрее для моей задачи вместо стандартных 5-ядерных рекомендаций.

Еще один указатель - попробовать ParallelG C вместо G1G C. Для случая использования, подобного этому, когда вы читаете 1000x файлов, я заметил, что он работает лучше или не хуже, чем G1G c. Пожалуйста, попробуйте.

В своей рабочей нагрузке я использую подход coalesce(n), где 'n' дает мне 1 ГБ паркетный файл. Я читаю файлы параллельно, используя ВСЕ ядра, доступные в кластере. Только во время записи мои ядра простаивают, но вы не можете этого избежать.

Я не уверен, как spark.sql.files.maxRecordsPerFile работает в сочетании с coalesce() or repartition(), но я обнаружил, что 1 ГБ кажется приемлемым с pandas, спектр красного смещения, Афина и др. c.

Надеюсь, это поможет. Чара

0 голосов
/ 08 января 2020

Вот некоторые оптимизации для более быстрой работы.

(1) File committer - так Spark будет считывать файлы деталей в корзину S3. Каждая операция индивидуальна и будет основана на

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

Описание

Это позволит записывать файлы непосредственно в файлы деталей или вместо первоначальной загрузки их во временные файлы и копирования их в конечные файлы. файлы части состояния.

(2) Размер файла можно определить исходя из среднего числа байтов на запись. Ниже я вычисляю количество байтов на запись, чтобы вычислить количество записей для 1024 МБ. Я бы попробовал сначала с 1024 МБ на раздел, а затем переместиться вверх.

import org.apache.spark.util.SizeEstimator

val numberBytes : Long = SizeEstimator.estimate(inputDF.rdd)
val reduceBytesTo1024MB = numberBytes/123217728
val numberRecords = inputDF.count
val recordsFor1024MB = (numberRecords/reduceBytesTo1024MB).toInt + 1 

(3) [Я не пробовал это] EMR Committer - если вы используете EMR 5.19 или выше, так как вы выводите Паркет. Вы можете установить для оптимизированного Parquet Writer значение TRUE.

spark.sql.parquet.fs.optimized.committer.optimization-enabled true
...