Spark Join Лучшие вопросы эффективности матчей - PullRequest
0 голосов
/ 14 октября 2019

У меня есть 2 кадра данных:

  • df_1 с примерно 500 миллионами записей и ~ 100 столбцами
  • df_2 с ~ 50 миллионами записей и 4 столбцами

Мне нужно соединиться слева df_1 с df_2 с точным совпадением по двум столбцам и лучшим совпадением по третьему. Под лучшим соответствием я подразумеваю одно: множество отношений слева направо, однако я хочу получить только лучшее соответствие справа по длине.

например

# df_1
col1    col2    col3
---------------------------
a       b       abcde
# df_2
col1    col2    col3    col4
-------------------------------
a       b       a       90
a       b       ab      100
a       b       abc     150
a       c       abc     90

Таким образом, желаемый результат объединения, когда я точно сопоставляю col1 и col2 и col3 на лучшее совпадение содержащихся строк:

col1    col2    col3    col4
-------------------------------
a       b       abcde   150

Некоторые очки работают против меняздесь:

  • Длина col3 с левой стороны обычно составляет от 10 до 15 символов, с правой стороны может варьироваться от 1 до 9 символов
  • Обаdf_1 и df_2 аналогично перекошены на col3

Пока я работаю, я получаю ужасную производительность .

Я испробовал следующие решения и до сих пор никуда не добираюсь:

  • вещание df_2 (слишком велик для вещания)
  • присоединиться точно кcol1 и col1 и использовать like на col3 (ужасно)
  • взорвать значения на col3 в df_2, чтобы попытаться бороться с перекосом (улучшение, но все еще медленное)
  • сохраняют данные и перебирают каждую длину в правой части и соединяются точно при объединении col1, col2 и col3 (где объединение левой стороны является подстрокой col3) (улучшается, но все еще медленно)

Какой самый эффективный способ сделать это соединение с использованием spark?

1 Ответ

0 голосов
/ 14 октября 2019

Лучшим вариантом является уменьшение размера данных перед присоединением (мы не можем уничтожить присоединение). Мы можем уменьшить, как показано ниже:

Во-первых, загрузка данных

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> df1.show
+---+---+-----+
| c1| c2|   c3|
+---+---+-----+
|  a|  b|abcde|
|  c|  d|   fd|
+---+---+-----+

scala> df2.show
+---+---+----+---+
| c1| c2|  c3| c4|
+---+---+----+---+
|  a|  b|   a| 90|
|  a|  b| abd|100|
|  a|  b|abcd|150|
|  c|  d|wewe| 79|
+---+---+----+---+

Теперь нам нужно уменьшить размер df2 перед присоединением (это уменьшит время, необходимое для объединения, так как размер данныхменьше сравнивать) с помощью оконной функции и определения максимального значения обоих столбцов

scala> df2.withColumn("len", length($"c3")).withColumn("res", row_number().over(wind1)).filter($"res" === 1).withColumn("res2", row_number().over(wind2)).filter($"res2"=== 1).select("c1", "c2", "c3", "c4").show()
+---+---+----+---+
| c1| c2|  c3| c4|
+---+---+----+---+
|  c|  d|wewe| 79|
|  a|  b|abcd|150|
+---+---+----+---+

вещей, которые можно попробовать:

1> Вы можете присоединиться к этим сокращенным фреймам данных и применить используемую логику

2> Попробуйте выполнить объединение df1.withColumn("c4", lit(0)).union(df2), а затем примените приведенную выше логику.

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...