Записать спарк-фрейм в отдельный файл паркета - PullRequest
0 голосов
/ 06 сентября 2018

Я пытаюсь сделать что-то очень простое, и у меня очень глупая борьба. Я думаю, что это должно быть связано с фундаментальным неправильным пониманием того, что делает искра. Буду очень признателен за любую помощь или объяснение.

У меня есть очень большая таблица (~ 3 ТБ, ~ 300 ММ строк, 25 тыс. Разделов), сохраненная как паркет в s3, и я хотел бы дать кому-нибудь крошечный образец ее в виде одного файла паркета. К сожалению, это займет вечность, чтобы закончить, и я не понимаю, почему. Я пробовал следующее:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

и затем, когда это не сработало, я попробовал это, что я думал, что должно быть таким же, но я не был уверен. (Я добавил print для отладки.)

tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

Когда я смотрю пользовательский интерфейс Yarn, оба операторы печати и write используют сопоставления по 25 тыс. count занял 3 минуты, show занял 25 минут, а write занял ~ 40 минут, хотя в итоге сделал записать таблицу файлов, которую я искал.

Мне кажется, что первая строка должна взять первые 500 строк и объединить их в один раздел, а затем остальные строки должны произойти очень быстро (на одном преобразователе / ​​преобразователе). Кто-нибудь может увидеть, что я здесь делаю не так? Мне сказали, может быть, я должен использовать sample вместо limit, но, насколько я понимаю, limit должно быть намного быстрее. Это правильно?

Заранее спасибо за любые мысли!

Ответы [ 2 ]

0 голосов
/ 07 сентября 2018

Сначала я подойду к вопросу о print функциях, так как это нечто фундаментальное для понимания искры.Тогда limit против sample.Тогда repartition против coalesce.

Причины, по которым функции print занимают так много времени, заключаются в том, что coalesce является ленивым преобразованием.Большинство преобразований в искре ленивы и не оцениваются, пока не будет вызвано действие .

Действия - это вещи, которые делают вещи и (в основном) не возвращают новый фрейм данных в результате.Как count, show.Они возвращают число и некоторые данные, тогда как coalesce возвращает кадр данных с 1 разделом (вроде, см. Ниже).

Происходит то, что вы перезапускаете запрос sql и вызов coalesce каждый раз, когда вызываете действие на фрейме данных tiny.Вот почему они используют мапперы 25k для каждого вызова.

Чтобы сэкономить время, добавьте метод .cache() в первую строку (в любом случае, для вашего кода print).

Затемпреобразования кадров данных фактически выполняются в вашей первой строке, и результат сохраняется в памяти ваших искровых узлов.

Это не повлияет на начальное время запроса для первой строки, но, по крайней мере, вы не выполняете этот запрос еще 2 раза, потому что результат был кэширован, и тогда действия могут использовать этокэшированный результат.

Чтобы удалить его из памяти, используйте метод .unpersist().

Теперь для фактического запроса, который вы пытаетесь выполнить ...

Это действительно зависит от того, как разделены ваши данные.Например, разделены ли они на определенные поля и т. Д.

Вы упомянули об этом в своем вопросе, но sample может быть правильным решением.

Почему это так?

limit должен найти 500 из первых строк.Если ваши данные не разделены по номеру строки (или некоторому инкрементному идентификатору), то первые 500 строк могут быть сохранены в любом из 25k-разделов.

Таким образом, искра должна пройти поиск по всем из них, пока не найдет все правильные значения.Мало того, он должен выполнить дополнительный шаг сортировки данных, чтобы иметь правильный порядок.

sample просто захватывает 500 случайных значений.Гораздо проще сделать это, так как нет никакого порядка / сортировки задействованных данных, и нет необходимости искать в определенных разделах определенные строки.

Хотя limit может быть быстрее, оно также имеет свои ограничения.Я обычно использую его только для очень маленьких подмножеств, таких как 10/20 строк.

Теперь для разбиения ....

Я думаю, что проблема с coalesce в том, что фактически изменяет разбиение.Теперь я не уверен в этом, так что щепотка соли.

В соответствии с pyspark документами:

эта операция приводит к узкой зависимости, например, если вы перейдете от 1000 разделов к 100 разделам, вместо этого не будет перемешивания.каждый из 100 новых разделов будет претендовать на 10 из текущих разделов.

Таким образом, ваши 500 строк будут по-прежнему сидеть на ваших 25k физических разделах, которые, по мнению spark, являются 1 виртуальным разделом.

Возможно, хорошая идея - вызвать случайное перемешивание (обычно плохое) и сохранить его в искровой памяти с .repartition(1).cache().Потому что вместо того, чтобы маперы 25k смотрели на физические разделы, когда вы write, это должно привести только к 1 мапперу, смотрящему на то, что находится в искровой памяти.Тогда write становится легким.Вы также имеете дело с небольшим подмножеством, поэтому любая перестановка должна быть (надеюсь) управляемой.

Очевидно, что это, как правило, плохая практика, и это не меняет того факта, что spark, вероятно, захочет запускать 25k-сопоставителей при выполнении исходного SQL-запроса.Надеюсь sample позаботится об этом.

изменить для уточнения перемешивания, repartition и coalesce

У вас есть 2 набора данных в 16 разделах в кластере из 4 узлов.Вы хотите присоединиться к ним и написать как новый набор данных в 16 разделах.

Строка 1 для данных 1 может быть на узле 1, а строка 1 для данных 2 - на узле 4.

Чтобы присоединитьсяв этих рядах spark должен физически переместить один или оба из них, а затем записать в новый раздел.

Это случайное физическое перемещение данных вокруг кластера.

Неважно, что все разделено на 16, важно то, где находятся данные в кластере.

data.repartition(4) физически перемещает данные из каждых 4 наборов разделов на узел в 1 раздел на узел.

Spark может переместить все 4 раздела с узла 1 на 3 других узла, в новый отдельный раздел на этих узлах, и наоборот.

Я бы не подумал, что он это сделает, но это крайний случай, демонстрирующий это.

A coalesce(4) вызов, хотя, не перемещает данные, это намного умнее. Вместо этого он распознает: «У меня уже есть 4 раздела на узел и всего 4 узла ... Я просто назову все 4 из этих разделов на узел одним разделом, и тогда у меня будет 4 всего раздела!»

Так что не нужно перемещать какие-либо данные, потому что он просто объединяет существующие разделы в объединенный раздел.

0 голосов
/ 06 сентября 2018

Попробуйте, в моем эмпирическом опыте перераспределение работает лучше для таких проблем:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.saveAsTable("db.tiny_table")

Еще лучше, если вы заинтересованы в паркете, вам не нужно сохранять его в виде таблицы:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.parquet(your_hdfs_path+"db.tiny_table")
...