Краткое описание проблемы: предположим, у меня есть 300+ ГБ данных, обрабатываемых с помощью Spark в кластере EMR в AWS. У этих данных есть три атрибута, используемых для разделения файловой системы для использования в Hive: дата, час и (скажем,) anotherAttr. Я хочу записать эти данные в файловую систему таким образом, чтобы минимизировать количество записываемых файлов.
То, что я делаю прямо сейчас, - это получение различных комбинаций даты, часа, anotherAttr и количества сколько рядов составляют комбинацию. Я собираю их в список в драйвере и перебираю список, создавая новый DataFrame для каждой комбинации, повторно разбивая этот DataFrame, используя количество строк для оценки размера файла, и записываю файлы на диск с помощью DataFrameWriter, .orc
завершение он выключен.
Мы не используем Parquet по организационным причинам.
Этот метод работает достаточно хорошо и решает проблему, заключающуюся в том, что последующие группы, использующие Hive вместо Spark, не видят проблем с производительностью, приводящих к из большого количества файлов. Например, если я возьму весь DataFrame объемом 300 ГБ, сделаю переразбиение с 1000 разделами (в искре) и соответствующими столбцами и выгружу его на диск, все будет сброшено параллельно и завершится через ~ 9 минут со всем этим. Но это дает до 1000 файлов для больших разделов, что снижает производительность Hive. Или это разрушает какую-то производительность, честно говоря не уверен на 100% что. Меня только что попросили, чтобы количество файлов было как можно меньше. С помощью метода, который я использую, я могу сохранить файлы любого размера, который я хочу (в любом случае относительно близкого), но нет параллелизма и требуется ~ 45 минут для запуска, в основном ожидая записи файлов.
Мне кажется, что, поскольку между некоторой исходной строкой и некоторой целевой строкой существует связь один-к-одному, и, поскольку я могу организовать данные в неперекрывающиеся «папки» (разделы для Hive), я должен иметь возможность организовать мой код / DataFrames таким образом, чтобы я мог попросить Spark записать все файлы назначения параллельно. Есть ли у кого-нибудь предложения, как атаковать это?
Вещи, которые я тестировал, но которые не работали:
Использование параллельной коллекции scala для запуска записи . Что бы искра ни делала с DataFrames, она не очень хорошо разделяла задачи, и на некоторых машинах возникали огромные проблемы со сборкой мусора.
DataFrame.map - я попытался сопоставить DataFrame уникальных комбинаций, и начальная запись записывается изнутри, но нет доступа к DataFrame данных, которые мне действительно нужны изнутри этого map
- ссылка DataFrame на исполнителя пуста.
DataFrame.mapPartitions - не запускается, не мог придумать никаких идей, как делать то, что я хочу изнутри mapPartitions
Слово «раздел» тоже не особенно полезно здесь, потому что это относится как к концепции искрового разделения данных по некоторым критериям, так и к способу организации данных на диске для Hive. Я думаю, что я был довольно ясен в приведенных выше примерах. Итак, если я представляю себе идеальное решение этой проблемы, я могу создать один DataFrame, который имеет 1000 разделов на основе трех атрибутов для быстрого запроса, а затем из этого создать еще одну коллекцию DataFrames, каждый из которых имеет ровно одну уникальную комбинацию эти атрибуты, перераспределенные (в Spark, но для Hive) с количеством разделов, соответствующим размеру содержащихся в нем данных. У большинства DataFrames будет 1 раздел, у некоторых - до 10. Размер файлов должен составлять ~ 3 ГБ, а наш кластер EMR имеет больше оперативной памяти, чем у каждого исполнителя, поэтому мы не должны видеть снижения производительности от них " большие разделы.
После того, как этот список DataFrames будет создан и каждый будет перераспределен, я могу попросить Spark записать их все на диск параллельно.
Возможно ли что-то подобное в Spark?
Одна вещь, которую я концептуально не понимаю: скажем, у меня есть
val x = spark.sql("select * from source")
и
val y = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr")
и
val z = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr2")
Насколько y
отличается от z
DataFrame? Если я заново разбиваю y
, какой эффект будет иметь перемешивание на z
и x
в этом отношении?