Ошибка автономного Pyspark: слишком много открытых файлов - PullRequest
0 голосов
/ 20 апреля 2020

У меня есть данные ~ 40 Гб (~ 80 м записей, только 2 столбца, текст) и я провел подсчет данных. Я мог бы успешно запустить его на экземпляре r5a.4xlarge на AWS. Занимает ок. 3 минуты, чтобы вернуть результаты. Однако, когда я изменяю экземпляр на больший, r5a.12xlarge, я получаю ошибку «Too Many Open Files», когда запускаю тот же код. Я пробовал несколько разных конфигураций для сеанса спарка, ни одна не работала. Кроме того, я увеличил предел LINUX открытых файлов до 4096, без изменений. Ниже приведен код и первая часть ошибки.

spark = (SparkSession
    .builder
    .appName('Project_name')
        .config('spark.executor.memory', "42G") #Tried 19G to 60G
        .config('spark.executor.instances', "4") #Tried 1 to 5 
        .config('spark.executor.cores', "4") #Tried 1 to 5 
        .config("spark.dynamicAllocation.enabled", "true") #Also tried without dynamic allocation
        .config("spark.dynamicAllocation.minExecutors","1")
        .config("spark.dynamicAllocation.maxExecutors","5")
        .config('spark.driver.memory', "42G") #Tried 19G to 60G
        .config('spark.driver.maxResultSize', '10G') #Tried 1G to 10G
    .config('spark.worker.cleanup.enabled', 'True')
    .config("spark.local.dir", "/tmp/spark-temp")
    .getOrCreate())

Ошибка:

>>> data.select(f.countDistinct("column_name")).show()

Py4JJavaError: An error occurred while calling o315.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 5.0 failed 1 times, most recent failure: Lost task 20.0 in stage 5.0 (TID 64, localhost, executor driver): java.io.FileNotFoundException: /tmp/spark-temp/blockmgr-c2f18891-a868-42ba-9075-dc145faaa4c4/16/temp_shuffle_f9c96d48-336d-423a-9edd-dcb9af5705a7 (Too many open files)

Есть мысли?

1 Ответ

0 голосов
/ 21 апреля 2020

Так как это огромный файл, то, когда spark читает файл, он создает 292 (292 * 128MB ~ 40G) разделов для файла. По умолчанию у spark есть spark. sql .shuffle.partitions = 200. Итак, вам просто нужно увеличить это число до числа, превышающего количество разделов. Кроме того, вы можете кэшировать файл в памяти для повышения производительности.

spark = (SparkSession
    .builder
    .appName('Project_name')
    .config('spark.executor.memory', "20G") 
    .config('spark.driver.memory', "20G") 
    .config('spark.driver.maxResultSize', '10G') 
    .config('spark.sql.shuffle.partitions',300) # Increasing SQL shuffle partitions
    .config('spark.worker.cleanup.enabled', 'True')
    .config("spark.local.dir", "/tmp/spark-temp")
    .getOrCreate())

>>> data.select(f.countDistinct("column_name")).show() # Result in ~2min
...