Spark CSV GZip для паркета? - PullRequest
       20

Spark CSV GZip для паркета?

0 голосов
/ 04 октября 2018

Я использую Spark 2.3.1 PySpark (AWS EMR)

Я получаю ошибки памяти:

Контейнер уничтожен YARN за превышение пределов памяти Рассмотрим возможность повышения spark.yarn.executor.memoryOverhead

У меня есть ввод 160 файлов, каждый файл около 350-400 МБ, каждый файл в формате CSV Gzip.

Для чтения файлов csv.gz (с подстановочными знаками), и я использую этоPyspark

dfgz = spark.read.load("s3://mybucket/yyyymm=201708/datafile_*.csv.gz",
    format="csv", sep="^", inferSchema="false", header="false", multiLine="true", quote="^", nullValue="~", schema="id string,...."))

Чтобы сохранить фрейм данных, я использую это (PySpark)

(dfgz
.write
.partitionBy("yyyymm")
.mode("overwrite") 
.format("parquet")
.option("path", "s3://mybucket/mytable_parquet")
.saveAsTable("data_test.mytable")
)    

Одна строка кода для сохранения всех 160 файлов.

Я пробовал это с1 файл, и он работает нормально.

Общий размер всех 160 файлов (csv.gzip) составляет около 64 ГБ.

Каждый файл в виде чистого CSV, когда Unzipped составляет около 3,5 ГБ.Я предполагаю, что Spark может разархивировать каждый файл в ОЗУ и затем преобразовать его в Parquet в ОЗУ ??

Я хочу преобразовать каждый файл csv.gzip в формат Parquet, т.е. я хочу 160 файлов Parquet в качестве вывода (в идеале).

Задача выполняется некоторое время, и создается впечатление, что для каждого файла CSV.GZ создается 1 файл Parquet.Через некоторое время это всегда приводит к ошибке памяти Yarn.

Я пробовал различные настройки для памяти исполнителей и memoryOverhead, и все результаты не меняются - задания всегда терпят неудачу.Я пробовал memoryOverhead объемом до 1-8 ГБ и памятью 8G.

Помимо ручного разбиения рабочей нагрузки на входные 160 файлов на множество небольших рабочих нагрузок, что еще я могу сделать?Нужен ли кластер Spark с общей емкостью ОЗУ, превышающей 64 ГБ?Я использую 4 подчиненных узла, каждый из которых имеет 8 ЦП и 16 ГБ на узел (ведомые), плюс один мастер на 4 ЦП и 8 ГБ ОЗУ.

Это (с накладными расходами) менее 64 ГБ входного gzipCSV-файлы, которые я пытаюсь обработать, но файлы имеют равномерный размер 350-400 МБ, поэтому я не понимаю, почему Spark выдает ошибки памяти, поскольку он может легко обрабатывать эти 1 файл за раз для каждого исполнителя, отбрасывать его и переходить к следующему файлу,Это не похоже на работу таким образом.Я чувствую, что он пытается загрузить все входные файлы csv.gzip в память, но у меня нет возможности узнать это (я все еще новичок в Spark 2.3.1).

Позднее обновление: мне удалось получить егоработать со следующей конфигурацией памяти:

4 подчиненных узла, каждый 8 ЦП и 16 ГБ ОЗУ 1 главный узел, 4 ЦП и 8 ГБ ОЗУ:

spark   maximizeResourceAllocation  false
spark-defaults  spark.driver.memoryOverhead 1g
spark-defaults  spark.executor.memoryOverhead   2g
spark-defaults  spark.executor.instances    8
spark-defaults  spark.executor.cores    3
spark-defaults  spark.default.parallelism   48
spark-defaults  spark.driver.memory 6g
spark-defaults  spark.executor.memory   6g

Само собой разумеется - яне могу объяснить, почему этот конфиг работал!Кроме того, это заняло 2 часа +, чтобы обработать 64 ГБ данных gzip, что кажется медленным даже для небольшого кластера 4 + 1 узла с общим количеством ЦП 32 + 4 и 64 + 8 ГБ ОЗУ.Возможно, узким местом было S3 .... ЧТО Я просто не ожидал микроуправления кластером базы данных для памяти, дискового ввода-вывода или выделения ресурсов процессора.

Обновление 2:

Я только что запустил другую загрузку в том же кластере с той же конфигурацией, меньшую загрузку из 129 файлов того же размера, и эта загрузка завершилась с ошибками памяти Yarn.Я очень разочарован управлением памятью Spark 2.3.1.

Спасибо за любые рекомендации

...