Я экспериментирую с памятью, используемой искровым фреймом данных, созданным из файла CSV и файла Parquet (из того же набора данных).
Вопрос 1. Сохранение кадра данных из файла паркета, по-видимому, занимает память / диск в случайном порядке.Сохранение другого фрейма данных из файла CSV для чтения (данные, идентичные файлу Parquet) не занимает память / диск случайного разлива.Почему файл DF из файла Parquet выходит за пределы назначенной памяти в случайном порядке, в отличие от файла CSV DF?
Вопрос 2. Почему разлив в случайном порядке увеличивается с увеличением количества разделов?
Обратите внимание, чтоЯ сохраняю фрейм данных, чтобы сократить время извлечения для последующих действий.
Размер кластера: 4 ГБ памяти на каждого исполнителя (SIG имеет 4 исполнителя с общей памятью 16 ГБ, а для каждого слота используется случайное использование памяти).800 МБ - доля памяти в случайном порядке - 0,2).
20 разделов (чтение партера)Размер ввода / записи: 1264,7 МБ / 2750579Перемешать запись: 3,9 ГБ / 2750579Случайный разлив (память): 17,8 ГБСлучайный разлив (диск): 2,8 ГБ
1000 перегородок (читать паркет)Размер ввода / записи: 1264,7 МБ / 2750579Перемешать запись: 3,9 ГБ / 2750579Случайный разлив (память): 41,8 ГБСлучайный разлив (диск): 6,2 ГБ
cddf_csv = spark.read.option("delimiter", "|").option("header","true").option("inferSchema", "true").csv("filename.csv").repartition(20)
cddf_csv.persist()
cddf_csv.count()
CSV-файлы с 20 разделами
Сведения для этапа 23 (попытка 0)Общее время на все задачи: 13 минутКраткое описание уровня местности: Местный процесс: 45Размер ввода / записи: 5,5 ГБ / 2750579Перемешать запись: 3,8 ГБ / 2750579
Запись файла паркета из того же набора данных:
cddf.repartition(4).write.option("compression","snappy").parquet("test_cd")
Подсчет и сохранение после считывания файла паркета и перераспределения на 20 блоков
cddf_p20 = spark.read.parquet("test_cd").repartition(20)
cddf_p20.persist()
cddf_p20.count()
Метрики для 20 разделов с последующим подсчетом
Детали для этапа 16 (попытка 0)Общее время на все задачи: 13 минутКраткое описание уровня местности: Местный процесс: 10Размер ввода / записи: 1264,7 МБ / 2750579Перемешать запись: 3,9 ГБ / 2750579Случайный разлив (память): 17,8 ГБСлучайный разлив (диск): 2,8 ГБ
Две нижние записи являются кумулятивными по всем задачам
Подсчитывать и сохранять после считывания файла паркета и перераспределения на 1000 блоков
cddf_p1000 = spark.read.parquet("test_cd").repartition(1000)
cddf_p1000.persist()
cddf_p1000.count()
Далее следуют метрики для 1000 разделовпо счету
Общее время на все задачи: 15 минутКраткое описание уровня местности: Местный процесс: 10Размер ввода / записи: 1264,7 МБ / 2750579Перемешать запись: 3,9 ГБ / 2750579Случайный разлив (память): 41,8 ГБСлучайный разлив (диск): 6,2 ГБ
Две нижние записи совокупны для всех задач