Я использую Spark 2.3.1 PySpark (AWS EMR)
Я получаю ошибки памяти:
Контейнер уничтожен YARN за превышение пределов памяти Рассмотрим возможность повышения spark.yarn.executor.memoryOverhead
У меня есть ввод 160 файлов, каждый файл около 350-400 МБ, каждый файл в формате CSV Gzip.
Для чтения файлов csv.gz (с подстановочными знаками), и я использую этоPyspark
dfgz = spark.read.load("s3://mybucket/yyyymm=201708/datafile_*.csv.gz",
format="csv", sep="^", inferSchema="false", header="false", multiLine="true", quote="^", nullValue="~", schema="id string,...."))
Чтобы сохранить фрейм данных, я использую это (PySpark)
(dfgz
.write
.partitionBy("yyyymm")
.mode("overwrite")
.format("parquet")
.option("path", "s3://mybucket/mytable_parquet")
.saveAsTable("data_test.mytable")
)
Одна строка кода для сохранения всех 160 файлов.
Я пробовал это с1 файл, и он работает нормально.
Общий размер всех 160 файлов (csv.gzip) составляет около 64 ГБ.
Каждый файл в виде чистого CSV, когда Unzipped составляет около 3,5 ГБ.Я предполагаю, что Spark может разархивировать каждый файл в ОЗУ и затем преобразовать его в Parquet в ОЗУ ??
Я хочу преобразовать каждый файл csv.gzip в формат Parquet, т.е. я хочу 160 файлов Parquet в качестве вывода (в идеале).
Задача выполняется некоторое время, и создается впечатление, что для каждого файла CSV.GZ создается 1 файл Parquet.Через некоторое время это всегда приводит к ошибке памяти Yarn.
Я пробовал различные настройки для памяти исполнителей и memoryOverhead, и все результаты не меняются - задания всегда терпят неудачу.Я пробовал memoryOverhead объемом до 1-8 ГБ и памятью 8G.
Помимо ручного разбиения рабочей нагрузки на входные 160 файлов на множество небольших рабочих нагрузок, что еще я могу сделать?Нужен ли кластер Spark с общей емкостью ОЗУ, превышающей 64 ГБ?Я использую 4 подчиненных узла, каждый из которых имеет 8 ЦП и 16 ГБ на узел (ведомые), плюс один мастер на 4 ЦП и 8 ГБ ОЗУ.
Это (с накладными расходами) менее 64 ГБ входного gzipCSV-файлы, которые я пытаюсь обработать, но файлы имеют равномерный размер 350-400 МБ, поэтому я не понимаю, почему Spark выдает ошибки памяти, поскольку он может легко обрабатывать эти 1 файл за раз для каждого исполнителя, отбрасывать его и переходить к следующему файлу,Это не похоже на работу таким образом.Я чувствую, что он пытается загрузить все входные файлы csv.gzip в память, но у меня нет возможности узнать это (я все еще новичок в Spark 2.3.1).
Позднее обновление: мне удалось получить егоработать со следующей конфигурацией памяти:
4 подчиненных узла, каждый 8 ЦП и 16 ГБ ОЗУ 1 главный узел, 4 ЦП и 8 ГБ ОЗУ:
spark maximizeResourceAllocation false
spark-defaults spark.driver.memoryOverhead 1g
spark-defaults spark.executor.memoryOverhead 2g
spark-defaults spark.executor.instances 8
spark-defaults spark.executor.cores 3
spark-defaults spark.default.parallelism 48
spark-defaults spark.driver.memory 6g
spark-defaults spark.executor.memory 6g
Само собой разумеется - яне могу объяснить, почему этот конфиг работал!Кроме того, это заняло 2 часа +, чтобы обработать 64 ГБ данных gzip, что кажется медленным даже для небольшого кластера 4 + 1 узла с общим количеством ЦП 32 + 4 и 64 + 8 ГБ ОЗУ.Возможно, узким местом было S3 .... ЧТО Я просто не ожидал микроуправления кластером базы данных для памяти, дискового ввода-вывода или выделения ресурсов процессора.
Обновление 2:
Я только что запустил другую загрузку в том же кластере с той же конфигурацией, меньшую загрузку из 129 файлов того же размера, и эта загрузка завершилась с ошибками памяти Yarn.Я очень разочарован управлением памятью Spark 2.3.1.
Спасибо за любые рекомендации