Для этого кадра данных:
scala> val df = List((1, "yes", "high", "on" ), (2, "no", "high", "off"), (3, "yes", "low", "off") ).toDF("id", "a1", "a2", "a3")
df: org.apache.spark.sql.DataFrame = [id: int, a1: string ... 2 more fields]
scala> df.show
+---+---+----+---+
| id| a1| a2| a3|
+---+---+----+---+
| 1|yes|high| on|
| 2| no|high|off|
| 3|yes| low|off|
+---+---+----+---+
Мы можем построить декартово произведение, используя crossJoin
с самим собой.Тем не менее, имена столбцов будут неоднозначными (я не знаю, как легко справиться с этим).Чтобы подготовиться к этому, давайте создадим второй фрейм данных:
scala> val df2 = df.toDF("id_2", "a1_2", "a2_2", "a3_2")
df2: org.apache.spark.sql.DataFrame = [id_2: int, a1_2: string ... 2 more fields]
scala> df2.show
+----+----+----+----+
|id_2|a1_2|a2_2|a3_2|
+----+----+----+----+
| 1| yes|high| on|
| 2| no|high| off|
| 3| yes| low| off|
+----+----+----+----+
В этом примере мы можем получить комбинации путем фильтрации с использованием id < id_2
.
scala> val xp = df.crossJoin(df2)
xp: org.apache.spark.sql.DataFrame = [id: int, a1: string ... 6 more fields]
scala> xp.show
+---+---+----+---+----+----+----+----+
| id| a1| a2| a3|id_2|a1_2|a2_2|a3_2|
+---+---+----+---+----+----+----+----+
| 1|yes|high| on| 1| yes|high| on|
| 1|yes|high| on| 2| no|high| off|
| 1|yes|high| on| 3| yes| low| off|
| 2| no|high|off| 1| yes|high| on|
| 2| no|high|off| 2| no|high| off|
| 2| no|high|off| 3| yes| low| off|
| 3|yes| low|off| 1| yes|high| on|
| 3|yes| low|off| 2| no|high| off|
| 3|yes| low|off| 3| yes| low| off|
+---+---+----+---+----+----+----+----+
scala> val filtered = xp.filter($"id" < $"id_2")
filtered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, a1: string ... 6 more fields]
scala> filtered.show
+---+---+----+---+----+----+----+----+
| id| a1| a2| a3|id_2|a1_2|a2_2|a3_2|
+---+---+----+---+----+----+----+----+
| 1|yes|high| on| 2| no|high| off|
| 1|yes|high| on| 3| yes| low| off|
| 2| no|high|off| 3| yes| low| off|
+---+---+----+---+----+----+----+----+
На данный момент проблема в основном решена.Чтобы получить финальную таблицу, мы можем использовать оператор when().otherwise()
для каждой пары столбцов или UDF, как я сделал здесь:
scala> val dist = udf((a:String, b: String) => if (a != b) 1 else 0)
dist: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, StringType)))
scala> val distinction = filtered.select($"id", $"id_2", dist($"a1", $"a1_2").as("a1"), dist($"a2", $"a2_2").as("a2"), dist($"a3", $"a3_2").as("a3"))
distinction: org.apache.spark.sql.DataFrame = [id: int, id_2: int ... 3 more fields]
scala> distinction.show
+---+----+---+---+---+
| id|id_2| a1| a2| a3|
+---+----+---+---+---+
| 1| 2| 1| 0| 1|
| 1| 3| 0| 1| 1|
| 2| 3| 1| 1| 0|
+---+----+---+---+---+