Сохранение кадра данных из файла Parquet приводит к случайному разливу - PullRequest
0 голосов
/ 25 сентября 2019

Я экспериментирую с памятью, используемой искровым фреймом данных, созданным из файла CSV и файла Parquet (из того же набора данных).

Вопрос 1. Сохранение кадра данных из файла паркета, по-видимому, занимает память / диск в случайном порядке.Сохранение другого фрейма данных из файла CSV для чтения (данные, идентичные файлу Parquet) не занимает память / диск случайного разлива.Почему файл DF из файла Parquet выходит за пределы назначенной памяти в случайном порядке, в отличие от файла CSV DF?

Вопрос 2. Почему разлив в случайном порядке увеличивается с увеличением количества разделов?

Обратите внимание, чтоЯ сохраняю фрейм данных, чтобы сократить время извлечения для последующих действий.

Размер кластера: 4 ГБ памяти на каждого исполнителя (SIG имеет 4 исполнителя с общей памятью 16 ГБ, а для каждого слота используется случайное использование памяти).800 МБ - доля памяти в случайном порядке - 0,2).

20 разделов (чтение партера)Размер ввода / записи: 1264,7 МБ / 2750579Перемешать запись: 3,9 ГБ / 2750579Случайный разлив (память): 17,8 ГБСлучайный разлив (диск): 2,8 ГБ

1000 перегородок (читать паркет)Размер ввода / записи: 1264,7 МБ / 2750579Перемешать запись: 3,9 ГБ / 2750579Случайный разлив (память): 41,8 ГБСлучайный разлив (диск): 6,2 ГБ


cddf_csv = spark.read.option("delimiter", "|").option("header","true").option("inferSchema", "true").csv("filename.csv").repartition(20)
cddf_csv.persist()
cddf_csv.count()

CSV-файлы с 20 разделами

Сведения для этапа 23 (попытка 0)Общее время на все задачи: 13 минутКраткое описание уровня местности: Местный процесс: 45Размер ввода / записи: 5,5 ГБ / 2750579Перемешать запись: 3,8 ГБ / 2750579


Запись файла паркета из того же набора данных:

cddf.repartition(4).write.option("compression","snappy").parquet("test_cd")

Подсчет и сохранение после считывания файла паркета и перераспределения на 20 блоков

cddf_p20 = spark.read.parquet("test_cd").repartition(20)
cddf_p20.persist()
cddf_p20.count()

Метрики для 20 разделов с последующим подсчетом

Детали для этапа 16 (попытка 0)Общее время на все задачи: 13 минутКраткое описание уровня местности: Местный процесс: 10Размер ввода / записи: 1264,7 МБ / 2750579Перемешать запись: 3,9 ГБ / 2750579Случайный разлив (память): 17,8 ГБСлучайный разлив (диск): 2,8 ГБ

Две нижние записи являются кумулятивными по всем задачам


Подсчитывать и сохранять после считывания файла паркета и перераспределения на 1000 блоков

cddf_p1000 = spark.read.parquet("test_cd").repartition(1000)
cddf_p1000.persist()
cddf_p1000.count()

Далее следуют метрики для 1000 разделовпо счету

Общее время на все задачи: 15 минутКраткое описание уровня местности: Местный процесс: 10Размер ввода / записи: 1264,7 МБ / 2750579Перемешать запись: 3,9 ГБ / 2750579Случайный разлив (память): 41,8 ГБСлучайный разлив (диск): 6,2 ГБ

Две нижние записи совокупны для всех задач

...