гибкое условие соединения в Spark (Scala) - PullRequest
0 голосов
/ 11 марта 2019

Я хочу иметь условие гибкого соединения, которое можно передать, например, в виде строки (или любого другого предложения?). Например, в следующем выражении выражение FLEXIBLE_CONDITION может изменяться в разных прогонах.

val df3 = df1.join(df2, FLEXIBLE_CONDITION, "fullouter")

Несколько примеров:

 (1) df1(s"query") === df2 (s"query_df2") 
 (2) df1(s"id") === df2(s"id_df2") && df1(s"item") === df2(s"item_df2")
 (3) Or combination of (1) and (2) or any other condition

Следует отметить, что имена столбцов, которые будут объединяться, будут основаны на них. Например, в (1), в df1 имя столбца является запросом, а в df2 это query_df2 и так далее.

FLEXIBLE_CONDITION не должен быть жестко закодирован, но может быть вводным и часто меняться. Или может быть автоматизирован на основе набора ввода (например, имена столбцов).

Ответы [ 2 ]

0 голосов
/ 11 марта 2019

Я понял это. Вот что я искал:

 val first :  String = unique_attrs(0)
 var expression : org.apache.spark.sql.Column = df1(first) === df2_r(s"$first" + "_df2")
 for (i <- 1 to unique_attrs.length - 1) {
   val attr : String = unique_attrs(1)
   expression = expression && df1(attr) === df2_r(s"$attr" + "_df2")
 }

 val df3 = df1.join(df2_r, expression, "fullouter")

Список атрибутов дается в качестве входных данных (unique_attrs) для метода.

0 голосов
/ 11 марта 2019

Вы можете указать выражение, которое следует использовать в соединении

подпись для этого

def join(right: Dataset[_], joinExprs: Column): DataFrame

Например,

val df1 = Seq(
    ("a1", "b1"),
    ("a2", "b2")
).toDF("a", "b")

val df2 = Seq(
    ("b1", "a1"),
    ("b2", "a2")
).toDF("b1", "a1")

df1.show
df2.show

выход

+---+---+
|  a|  b|
+---+---+
| a1| b1|
| a2| b2|
+---+---+

+---+---+
| b1| a1|
+---+---+
| b1| a1|
| b2| a2|
+---+---+

Вы можете создать любое выражение, которое хотите, и предоставить ему присоединение

val expression = df1("a") === df2("a1")
val result = df1 join (df2, expression)

result.show

выход

+---+---+---+---+
|  a|  b| b1| a1|
+---+---+---+---+
| a1| b1| b1| a1|
| a2| b2| b2| a2|
+---+---+---+---+

UPD:

Вы можете использовать createOrReplaceTempView Например

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

val res = spark.sql("select * from df1 inner join df2 on df1.a == df2.a1")
res.show

выход

+---+---+---+---+
|  a|  b| b1| a1|
+---+---+---+---+
| a1| b1| b1| a1|
| a2| b2| b2| a2|
+---+---+---+---+

Результат будет таким же, и вы можете предоставить SQL-запрос в виде строки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...