искровой ETL с объединениями по нескольким источникам - PullRequest
0 голосов
/ 16 января 2019

У меня есть требование соединить 50 нечетных файлов на наборе из 3 ключей, используя кадры данных spark. У меня есть таблица драйверов, которая имеет 100000 записей в день. Я оставил объединение этой таблицы с 53 другими файлами, используя кадры данных следующим образом.

val df1 = spark.read.parquet(<driver file>)
val df2 = spark.read.parquet(<right side file1>)
.
.
val df52 = spark.read.parquet(<right side file 52>)
//join
val refinedDF1 = df1.join(df2,Seq("key1","key2","key3"),"leftouter")).select(<some from left table>, <some from right table>)
val refinedDF2 = refinedDF1.join(df3,Seq("key1","key2","key3"),"leftouter")).select(<some from left table>, <some from right table>)
 .
 .
 so on for all 50 odd files
 refinedFinalDF.write.parquet(<s3 location>)

выполнение завершается с ошибкой

Контейнер вышел с ненулевым кодом выхода 52

Что, по сути, является исключением из памяти. У меня довольно большой кластер для набора данных из 100 000 записей. У меня есть EMR с 12 исполнителями по 16G каждый и памятью 20G.

Я попытался разделить фреймы данных вручную на 200 разделов с помощью df.repartition (200) в режиме циклического перебора, и это не помогло вообще. Среди ключей объединения только ключ1 отличается для всех записей, а ключ2 и ключ3 - одинаковые значения для всех записей. Есть ли какая-либо оптимизация, которая может быть сделана, чтобы заставить это работать? Конечный фрейм данных, который я пытаюсь сохранить, содержит более 140 столбцов. Если в таблице драйверов есть n записей, то после каждого левого внешнего я получаю только n записей.

Обновление: Я попытался создать меньший массив данных из таблицы драйверов с ограничением (100), и я все еще получаю исключение нехватки памяти.

Ответы [ 3 ]

0 голосов
/ 17 января 2019

так получилось, базовые данные в корзине s3, которые я использую для создания фрейма данных, содержат несколько папок, и я фильтровал определенную папку как часть моего фильтра. пример: spark.read.parquet (s3 bucket) .filter ('folder_name = "val"). Похоже, что спарк загружает все данные из корзины s3 в память исполнителя, а затем запускает фильтр. Вот почему это был взрыв, когда та же логика, что и запрос улья, выполняемый для внешней таблицы улья, указывающего на местоположение s3 с папкой в ​​виде столбца раздела, работала просто отлично. Мне пришлось удалить фильтр и прочитать конкретную папку, чтобы решить проблему .. spark.read.parquet (s3 bucket / folder = value) ..

0 голосов
/ 18 января 2019

У меня было похожее состояние, когда у меня было несколько объединений, и в конце мне пришлось записать окончательный кадр данных в таблицу HDFS / Hive (формат паркета).

Spark работает на механизме отложенного выполнения, который означает, что когда ваш 53-й кадр данных активирован (Сохранить / записать как паркет), Spark затем возвращается ко всем соединениям и выполняет их, что вызывает огромную перетасовку данных и, в конечном итоге, вашу работу. контейнеры дают сбой и выбрасывают ошибки памяти.

Предложение: Вы можете сначала записать каждый присоединенный кадр данных в HDFS. Я хочу сказать, что после того, как вы объединили 2 (может быть более 2, но ограничьте их) данных, запишите объединенный кадр данных в HDFS / Hive и используйте select * 'hive parquet table

val refinedDF1 = df1.join(df2 ,condition,'join_type')
refinedDF1.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine1")
val refinedDF1 = hc.sql("select * from dbname.refine1")

val refinedDF2 = refinedDF1.join(df3)
refinedDF2.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine2")
val refinedDF2 = hc.sql("select * from dbname.refine2")

Теперь вы часто записываете свои соединения в hdfs, это означает, что искре не нужно будет выполнять их при вызове окончательного соединения, ему нужно будет использовать только вывод 52-го соединения, который вы сохранили в виде таблицы.

С использованием этой методологии мой сценарий сократился с 22 часов (включая ошибки памяти контейнера) до 15-30 (без исключений / ошибок памяти).

Несколько советов:

1) Исключить записи, в которых ваше объединение key равно нулю, spark не дает хорошей производительности по сравнению с объединениями, имеющими условие null = null, поэтому удалите их перед присоединением к фреймам данных

2) Используйте широковещательные объединения, когда у вас осталось много строк, а правый - искомый или несколько строк.

3) После выполнения скрипта вам придется очистить промежуточные кадры данных, которые вы сохраняете в Hive / Hdfs.

0 голосов
/ 16 января 2019

Ваши таблицы 1-1 или 1-много? Если они один ко многим, то ваши объединения приведут к появлению гораздо большего количества строк, чем вы, вероятно, хотите. Если дело обстоит именно так, один из вариантов - сначала выполнить groupBy для каждой таблицы, к которой вы собираетесь присоединиться. Рассмотрите этот пример:

val df1 = Seq(1, 2).toDF("id")
val df2 = Seq(
  (1, "a", true),
  (1, "b", false),
  (2, "c", true)
).toDF("id", "C2", "B2")

val df3 = Seq(
  (1, "x", false),
  (1, "y", true),
  (2, "z", false)
).toDF("id", "C3", "B3")

// Left outer join without accounting for 1-Many relationship.  Results in cartesian
// joining on each ID value!
df1.
  join(df2, Seq("id"), "left_outer").
  join(df3, Seq("id"), "left_outer").show()

+---+---+-----+---+-----+
| id| C2|   B2| C3|   B3|
+---+---+-----+---+-----+
|  1|  b|false|  y| true|
|  1|  b|false|  x|false|
|  1|  a| true|  y| true|
|  1|  a| true|  x|false|
|  2|  c| true|  z|false|
+---+---+-----+---+-----+

В качестве альтернативы, если вы сгруппируете строки перед объединением, чтобы ваши отношения всегда были 1-1, вы не получите добавляемых записей

val df2Grouped = df2.groupBy("id").agg(collect_list(struct($"C2", $"B2")) as "df2")
val df3Grouped = df3.groupBy("id").agg(collect_list(struct($"C3", $"B3")) as "df3")

val result = df1.
  join(df2Grouped, Seq("id"), "left_outer").
  join(df3Grouped, Seq("id"), "left_outer")
result.printSchema
result.show(10, false)

scala> result.printSchema
root
 |-- id: integer (nullable = false)
 |-- df2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- C2: string (nullable = true)
 |    |    |-- B2: boolean (nullable = false)
 |-- df3: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- C3: string (nullable = true)
 |    |    |-- B3: boolean (nullable = false)


scala> result.show(10, false)
+---+-----------------------+-----------------------+
|id |df2                    |df3                    |
+---+-----------------------+-----------------------+
|1  |[[a, true], [b, false]]|[[x, false], [y, true]]|
|2  |[[c, true]]            |[[z, false]]           |
+---+-----------------------+-----------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...