Построение матрицы различий в Spark - PullRequest
0 голосов
/ 27 февраля 2019

Я пытаюсь построить матрицу различий, используя искру, и не понимаю, как это сделать оптимально.Я новичок в искре.Ниже приведен небольшой пример того, что я пытаюсь сделать.

Пример построения матрицы различий:

С учетом набора данных D:

+----+-----+------+-----+  
| id | a1  |  a2  | a3  |  
+----+-----+------+-----+  
|  1 | yes | high | on  |  
|  2 | no  | high | off |
|  3 | yes | low  | off |
+----+-----+------+-----+

и моего различиятаблица имеет вид

+-------+----+----+----+
| id,id | a1 | a2 | a3 |
+-------+----+----+----+
| 1,2   |  1 |  0 |  1 |
| 1,3   |  0 |  1 |  1 |
| 2,3   |  1 |  1 |  0 |
+-------+----+----+----+

, т. е. всякий раз, когда атрибут i полезен для различения пары кортежей, таблица различия имеет 1, в противном случае - 0.

Мои наборы данныхогромны, и я пытаюсь сделать это в искре. Ниже приводятся подходы, которые пришли мне в голову:

  1. использование вложенного цикла for для перебора всех членов RDD (набора данных)
  2. используя преобразование cartesian () поверх оригинального RDD и итерируя по всем членам результирующего RDD, чтобы получить таблицу различий.

Мои вопросы:
При первом подходе спарк автоматически оптимизирует вложенные для настройки цикла внутренниедля параллельной обработки?

При втором подходе использование cartesian () приводит к дополнительным затратам на хранение промежуточного RDD.Есть ли способ избежать этих накладных расходов на хранение и получить окончательную таблицу различий?

Какой из этих подходов лучше, и есть ли другой подход, который может быть полезен для эффективного построения матрицы различий (как пространства, так и времени)?

1 Ответ

0 голосов
/ 27 февраля 2019

Для этого кадра данных:

scala> val df = List((1, "yes", "high", "on" ), (2,  "no", "high", "off"), (3, "yes",  "low", "off") ).toDF("id", "a1", "a2", "a3")
df: org.apache.spark.sql.DataFrame = [id: int, a1: string ... 2 more fields]

scala> df.show
+---+---+----+---+
| id| a1|  a2| a3|
+---+---+----+---+
|  1|yes|high| on|
|  2| no|high|off|
|  3|yes| low|off|
+---+---+----+---+

Мы можем построить декартово произведение, используя crossJoin с самим собой.Тем не менее, имена столбцов будут неоднозначными (я не знаю, как легко справиться с этим).Чтобы подготовиться к этому, давайте создадим второй фрейм данных:

scala> val df2 = df.toDF("id_2", "a1_2", "a2_2", "a3_2")
df2: org.apache.spark.sql.DataFrame = [id_2: int, a1_2: string ... 2 more fields]

scala> df2.show
+----+----+----+----+
|id_2|a1_2|a2_2|a3_2|
+----+----+----+----+
|   1| yes|high|  on|
|   2|  no|high| off|
|   3| yes| low| off|
+----+----+----+----+

В этом примере мы можем получить комбинации путем фильтрации с использованием id < id_2.

scala> val xp = df.crossJoin(df2)
xp: org.apache.spark.sql.DataFrame = [id: int, a1: string ... 6 more fields]

scala> xp.show
+---+---+----+---+----+----+----+----+
| id| a1|  a2| a3|id_2|a1_2|a2_2|a3_2|
+---+---+----+---+----+----+----+----+
|  1|yes|high| on|   1| yes|high|  on|
|  1|yes|high| on|   2|  no|high| off|
|  1|yes|high| on|   3| yes| low| off|
|  2| no|high|off|   1| yes|high|  on|
|  2| no|high|off|   2|  no|high| off|
|  2| no|high|off|   3| yes| low| off|
|  3|yes| low|off|   1| yes|high|  on|
|  3|yes| low|off|   2|  no|high| off|
|  3|yes| low|off|   3| yes| low| off|
+---+---+----+---+----+----+----+----+


scala> val filtered = xp.filter($"id" < $"id_2")
filtered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, a1: string ... 6 more fields]

scala> filtered.show
+---+---+----+---+----+----+----+----+
| id| a1|  a2| a3|id_2|a1_2|a2_2|a3_2|
+---+---+----+---+----+----+----+----+
|  1|yes|high| on|   2|  no|high| off|
|  1|yes|high| on|   3| yes| low| off|
|  2| no|high|off|   3| yes| low| off|
+---+---+----+---+----+----+----+----+

На данный момент проблема в основном решена.Чтобы получить финальную таблицу, мы можем использовать оператор when().otherwise() для каждой пары столбцов или UDF, как я сделал здесь:

scala> val dist = udf((a:String, b: String) => if (a != b) 1 else 0)
dist: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, StringType)))

scala> val distinction = filtered.select($"id", $"id_2", dist($"a1", $"a1_2").as("a1"), dist($"a2", $"a2_2").as("a2"), dist($"a3", $"a3_2").as("a3"))
distinction: org.apache.spark.sql.DataFrame = [id: int, id_2: int ... 3 more fields]

scala> distinction.show
+---+----+---+---+---+
| id|id_2| a1| a2| a3|
+---+----+---+---+---+
|  1|   2|  1|  0|  1|
|  1|   3|  0|  1|  1|
|  2|   3|  1|  1|  0|
+---+----+---+---+---+
...