Эффективные объединения в искровом фрейме - PullRequest
0 голосов
/ 16 октября 2018

Я пытаюсь присоединиться к DataFrame (dfA) последовательно в одном и том же DataFrame.Допустим, в dfA есть столбцы id_x и id_y, а в dfB есть столбец id и некоторые другие столбцы.

Я хочу выполнить следующее:

dfA.join(dfB, dfA("id_x") === dfB("id")).join(dfB, dfA("id_y") === dfB("id"))

Есть ликакой-либо вид перераспределения или предварительной обработки, который я могу сделать, чтобы ускорить это?

Ответы [ 2 ]

0 голосов
/ 17 октября 2018

Какую версию spark вы используете?Tuning Spark - это искусство и сама по себе обширная тема.Просто слепое увеличение количества разделов не всегда помогает.Я бы посоветовал взглянуть на следующие места для подсказки:

  1. Внимательно посмотрите на интерфейс Spark и проанализируйте свой DAG.Где узкое место?Ожидание процессора, памяти, дискового ввода-вывода?Слишком много тасования?
  2. Ваши данные искажены?Немногие задачи выполняются долго, в то время как большинство из них быстро заканчиваются?
  3. Какое преобразование вы использовали?Пожалуйста, вставьте фрагмент кода, если это возможно.
  4. Bucketing - это нечто новое в Spark, которое, как ожидается, поможет с объединениями.Но изучение вашей группы DAG всегда является лучшим источником подсказки.
  5. Также на основе вашего кода, когда вы хотите использовать dfA ("id_x") и dfA ("id_y"), чтобы присоединиться кDFB ( "ID")?Вероятно, вы можете попробовать что-то ниже, чем ИЛИ в условии соединения

    val joinCondition = when ($ "dfA.id_y" .isNull, $ "dfA.id_y" === $ "dfB.id").в противном случае ($ "dfA.id_x" === $ "dfB.id")

    val dfJoined = dfA.join (dfB, joinCondition)

Пожалуйста, дайте мнезнать свои выводы.

0 голосов
/ 16 октября 2018

Вы можете сделать это в 1 соединении:

dfA.join(dfB, dfA("id_x") === dfB("id") or dfA("id_y") === dfB("id"))

Вы также можете играть с spark.sql.shuffle.partitions или попытаться транслировать один кадр данных.Перераспределение перед объединением не поможет, но использование таблиц с разбивкой может помочь, потому что это может избежать повторного распределения во время объединения, см., Например, https://issues.apache.org/jira/browse/SPARK-12394

...