Недостаточно памяти при попытке сохранить данные - PullRequest
1 голос
/ 09 мая 2019

Я сталкиваюсь с ошибкой нехватки памяти при попытке сохранить кадр данных, и я не совсем понимаю, почему. У меня есть примерно 20Gb данных с 2,5 миллионами строк и около 20 столбцов. После фильтрации этого информационного кадра у меня есть 4 столбца и 0,5 миллиона строк.

Теперь моя проблема заключается в том, что при сохранении отфильтрованного фрейма данных возникает ошибка нехватки памяти (превышает 25,4 ГБ используемой физической памяти 20 ГБ). Я пытался сохранить на разных уровнях хранения

df = spark.read.parquet(path) # 20 Gb
df_filter = df.select('a', 'b', 'c', 'd').where(df.a == something) # a few Gb
df_filter.persist(StorageLevel.MEMORY_AND_DISK) 
df_filter.count()

Мой кластер имеет 8 узлов с 30 ГБ памяти каждый.

Есть ли у вас какие-либо идеи, откуда может появиться эта ООМ?

1 Ответ

1 голос
/ 09 мая 2019

Просто несколько советов, которые помогут определить основную причину ...

Возможно, у вас есть (или комбинация) из ...

  1. асимметричные размеры разделенных разделов исходных данных, которыетрудно справиться и вызвать сборку мусора, OOM и т. д. (эти методы помогли мне, но могут быть более подходящие подходы для каждого случая использования)
# to check num partitions
df_filter.rdd.getNumPartitions()

# to repartition (**does cause shuffle**) to increase parallelism and help with data skew
df_filter.repartition(...) # monitor/debug performance in spark ui after setting
слишком мало / слишком много исполнителей / оперативной памяти / ядер, установленных в конфигурации
# check via
spark.sparkContext.getConf().getAll()

# these are the ones you want to watch out for
'''
--num-executors
--executor-cores
--executor-memory
'''
широкое преобразование перетасовывает размер слишком мало / слишком много => попробуйте общие проверки отладки, чтобы просмотреть преобразования, которые будут запущены при сохранении + найти их # выходных разделов на диске
# debug directed acyclic graph [dag]
df_filter.explain() # also "babysit" in spark UI to examine performance of each node/partitions to get specs when you are persisting

# check output partitions if shuffle occurs
spark.conf.get("spark.sql.shuffle.partitions")
...