Просто несколько советов, которые помогут определить основную причину ...
Возможно, у вас есть (или комбинация) из ...
- асимметричные размеры разделенных разделов исходных данных, которыетрудно справиться и вызвать сборку мусора, OOM и т. д. (эти методы помогли мне, но могут быть более подходящие подходы для каждого случая использования)
# to check num partitions
df_filter.rdd.getNumPartitions()
# to repartition (**does cause shuffle**) to increase parallelism and help with data skew
df_filter.repartition(...) # monitor/debug performance in spark ui after setting
слишком мало / слишком много исполнителей / оперативной памяти / ядер, установленных в конфигурации
# check via
spark.sparkContext.getConf().getAll()
# these are the ones you want to watch out for
'''
--num-executors
--executor-cores
--executor-memory
'''
широкое преобразование перетасовывает размер слишком мало / слишком много => попробуйте общие проверки отладки, чтобы просмотреть преобразования, которые будут запущены при сохранении + найти их # выходных разделов на диске
# debug directed acyclic graph [dag]
df_filter.explain() # also "babysit" in spark UI to examine performance of each node/partitions to get specs when you are persisting
# check output partitions if shuffle occurs
spark.conf.get("spark.sql.shuffle.partitions")