Разделение данных искры на разделы и параллельная запись этих разделов на диск - PullRequest
0 голосов
/ 09 мая 2020

Краткое описание проблемы: предположим, у меня есть 300+ ГБ данных, обрабатываемых с помощью Spark в кластере EMR в AWS. У этих данных есть три атрибута, используемых для разделения файловой системы для использования в Hive: дата, час и (скажем,) anotherAttr. Я хочу записать эти данные в файловую систему таким образом, чтобы минимизировать количество записываемых файлов.

То, что я делаю прямо сейчас, - это получение различных комбинаций даты, часа, anotherAttr и количества сколько рядов составляют комбинацию. Я собираю их в список в драйвере и перебираю список, создавая новый DataFrame для каждой комбинации, повторно разбивая этот DataFrame, используя количество строк для оценки размера файла, и записываю файлы на диск с помощью DataFrameWriter, .orc завершение он выключен.

Мы не используем Parquet по организационным причинам.

Этот метод работает достаточно хорошо и решает проблему, заключающуюся в том, что последующие группы, использующие Hive вместо Spark, не видят проблем с производительностью, приводящих к из большого количества файлов. Например, если я возьму весь DataFrame объемом 300 ГБ, сделаю переразбиение с 1000 разделами (в искре) и соответствующими столбцами и выгружу его на диск, все будет сброшено параллельно и завершится через ~ 9 минут со всем этим. Но это дает до 1000 файлов для больших разделов, что снижает производительность Hive. Или это разрушает какую-то производительность, честно говоря не уверен на 100% что. Меня только что попросили, чтобы количество файлов было как можно меньше. С помощью метода, который я использую, я могу сохранить файлы любого размера, который я хочу (в любом случае относительно близкого), но нет параллелизма и требуется ~ 45 минут для запуска, в основном ожидая записи файлов.

Мне кажется, что, поскольку между некоторой исходной строкой и некоторой целевой строкой существует связь один-к-одному, и, поскольку я могу организовать данные в неперекрывающиеся «папки» (разделы для Hive), я должен иметь возможность организовать мой код / ​​DataFrames таким образом, чтобы я мог попросить Spark записать все файлы назначения параллельно. Есть ли у кого-нибудь предложения, как атаковать это?

Вещи, которые я тестировал, но которые не работали:

  1. Использование параллельной коллекции scala для запуска записи . Что бы искра ни делала с DataFrames, она не очень хорошо разделяла задачи, и на некоторых машинах возникали огромные проблемы со сборкой мусора.

  2. DataFrame.map - я попытался сопоставить DataFrame уникальных комбинаций, и начальная запись записывается изнутри, но нет доступа к DataFrame данных, которые мне действительно нужны изнутри этого map - ссылка DataFrame на исполнителя пуста.

  3. DataFrame.mapPartitions - не запускается, не мог придумать никаких идей, как делать то, что я хочу изнутри mapPartitions

Слово «раздел» тоже не особенно полезно здесь, потому что это относится как к концепции искрового разделения данных по некоторым критериям, так и к способу организации данных на диске для Hive. Я думаю, что я был довольно ясен в приведенных выше примерах. Итак, если я представляю себе идеальное решение этой проблемы, я могу создать один DataFrame, который имеет 1000 разделов на основе трех атрибутов для быстрого запроса, а затем из этого создать еще одну коллекцию DataFrames, каждый из которых имеет ровно одну уникальную комбинацию эти атрибуты, перераспределенные (в Spark, но для Hive) с количеством разделов, соответствующим размеру содержащихся в нем данных. У большинства DataFrames будет 1 раздел, у некоторых - до 10. Размер файлов должен составлять ~ 3 ГБ, а наш кластер EMR имеет больше оперативной памяти, чем у каждого исполнителя, поэтому мы не должны видеть снижения производительности от них " большие разделы.

После того, как этот список DataFrames будет создан и каждый будет перераспределен, я могу попросить Spark записать их все на диск параллельно.

Возможно ли что-то подобное в Spark?

Одна вещь, которую я концептуально не понимаю: скажем, у меня есть

val x = spark.sql("select * from source")

и

val y = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr")

и

val z = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr2")

Насколько y отличается от z DataFrame? Если я заново разбиваю y, какой эффект будет иметь перемешивание на z и x в этом отношении?

Ответы [ 2 ]

1 голос
/ 14 мая 2020

У нас была такая же проблема (почти), и мы закончили тем, что работали напрямую с RDD (вместо DataFrames) и реализовали наш собственный механизм разделения (путем расширения org. apache .spark.Partitioner)

Детали: читаем JSON сообщений от Кафки. JSON следует сгруппировать по полям customerid / date / more и записать в Had oop с использованием формата Parquet, не создавая слишком много маленьких файлов.

Шаги (упрощенная версия): a) Прочтите сообщения от Kafka и преобразовать их в структуру RDD [(GroupBy, Message)]. GroupBy - это класс case, содержащий все поля, которые используются для группировки.

b) Используйте преобразование reduceByKeyLocally и получите карту показателей (количество сообщений / размер сообщений / et c) для каждой группы - например, Map [GroupBy, GroupByMetrics]

c) Создайте GroupPartitioner, который использует ранее собранные метрики (и некоторые входные параметры, такие как желаемый размер Parquet et c), чтобы вычислить, сколько разделов должно быть создано для каждый объект GroupBy. По сути, мы расширяем org. apache .spark.Partitioner и переопределяем numPartitions и getPartition (key: Any)

d) мы разделяем RDD из a) с помощью ранее определенного средства разделения: newPartitionedRdd = rdd.partitionBy ( ourCustomGroupByPartitioner)

e) Вызвать spark.sparkContext.runJob с двумя параметрами: первый - это RDD, разделенный на d), второй - настраиваемая функция (fun c: (TaskContext, Iterator [T ]), который будет записывать сообщения, взятые из Iterator [T], в Hadoop / Parquet

Допустим, у нас есть 100 миллионов сообщений, сгруппированных таким образом

Group1 - 2 mil

Group2 - 80 мил

Group3 - 18 мил, и мы решили, что нам нужно использовать 1,5 миллиона сообщений на раздел для получения файлов Parquet размером более 500 МБ. В итоге мы получим 2 раздела для Group1, 54 для Group2 , 12 для группы 3.

0 голосов
/ 09 мая 2020

Этот оператор:

Я собираю их в список в драйвере и перебираю список, создавая новый DataFrame для каждой комбинации, перераспределяя этот DataFrame, используя количество строк для предполагаемого файла size и записывает файлы на диск с помощью DataFrameWriter, .или c заканчивая это.

совершенно не соответствует действительности, когда речь идет о Spark. Сбор данных в драйвер никогда не бывает хорошим подходом, проблемы с объемами и OOM, а также задержка в вашем подходе высоки. 1009 *

df.repartition(cols...)...write.partitionBy(cols...)...

перемешивание происходит через repartition, без перемешивания с partitionBy.

Все очень просто, с использованием параллелизма по умолчанию Spark.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...