Как отобразить элементы массива на каждую запись в кадре данных Spark - PullRequest
0 голосов
/ 08 ноября 2019

Я работаю над фреймом данных, который выглядит следующим образом -

val df = Seq(
(0.0  ),
(0.0  ),
(0.0  ),
(0.317),
(0.0  ),
(0.0  ),
(-0.78),
(-0.37),
(0.0  ),
(0.0  ),
(0.0  ),
(0.0  )
).toDF("importance")

Теперь у меня есть еще немного кода для получения столбцов labels и features в виде массивов, как показано ниже -

val labels = Array(0,1,2)
import org.apache.spark.sql.functions.typedLit
val df1 = df.withColumn("labels", typedLit(labels))
val featureNames = Array("a","b","c","d")
val df2 = df1.withColumn("features", typedLit(featureNames))

scala> df2.show(false)
+----------+---------+------------+
|importance|labels   |features    |
+----------+---------+------------+
|0.0       |[0, 1, 2]|[a, b, c, d]|
|0.0       |[0, 1, 2]|[a, b, c, d]|
|0.0       |[0, 1, 2]|[a, b, c, d]|
|0.317     |[0, 1, 2]|[a, b, c, d]|
|0.0       |[0, 1, 2]|[a, b, c, d]|
|0.0       |[0, 1, 2]|[a, b, c, d]|
|-0.78     |[0, 1, 2]|[a, b, c, d]|
|-0.37     |[0, 1, 2]|[a, b, c, d]|
|0.0       |[0, 1, 2]|[a, b, c, d]|
|0.0       |[0, 1, 2]|[a, b, c, d]|
|0.0       |[0, 1, 2]|[a, b, c, d]|
|0.0       |[0, 1, 2]|[a, b, c, d]|
+----------+---------+------------+

Теперь, используя этот фрейм данных, я хочу выровнять каждый столбец значения важности с каждым элементом массивов labels и features. Таким образом, вывод должен выглядеть примерно так -

label feature name  importance
0         a             0      
0         b             0      
0         c             0      
0         d             0.3176 
1         a             0      
1         b             0      
1         c             -0.78  
1         d             -0.37  
2         a             0      
2         b             0      
2         c             0      
2         d             0  

Итак, первая запись имеет label=0 и feature=a и имеет importance = 0.

1 Ответ

0 голосов
/ 08 ноября 2019

На основании вашего примера недостаточно информации, чтобы у меня было детерминированное решение. Поскольку spark - это механизм распределенной обработки, вам понадобится детерминированный способ сортировки importance фрейма данных для достижения необходимого результата.

Я считаю, что для получения нужного результата необходимо изменить подход:

  1. CROSS JOIN label и feature в labelFeatureDs
  2. Добавьте столбец rownumber на основе необходимой вам сортировки к labelFeatureDs (в вашем примере сортировкаэто на label, затем feature)
  3. Добавить rownumber к importance в зависимости от того, какая сортировка у вас здесь
  4. INNER JOIN importance к labelFeatureDs на rownumber

Код:

import org.apache.spark.sql.expressions.Window
spark.conf.set("spark.sql.crossJoin.enabled", true)

val importanceDf = Seq(
(0.0  ),
(0.0  ),
(0.0  ),
(0.317),
(0.0  ),
(0.0  ),
(-0.78),
(-0.37),
(0.0  ),
(0.0  ),
(0.0  ),
(0.0  )
).
toDF("importance").
select(col("*"), row_number().over(Window.partitionBy(lit(null)).orderBy(lit(null))).as("rn"))
importanceDf.show

val labelsDf = Seq(0,1,2).toDF("label")
val featuresDf = Seq("a","b","c","d").toDF("feature")

val labelFeatureDs = labelsDf.join(featuresDf).sort($"label", $"feature").select(col("*"), row_number().over(Window.partitionBy(lit(null)).orderBy(lit(null))).as("rn"))
labelFeatureDs.show

val result = labelFeatureDs.join(importanceDf, "rn")

Результат:

scala> result.show
+---+-----+-------+----------+
| rn|label|feature|importance|
+---+-----+-------+----------+
|  1|    0|      a|       0.0|
|  2|    0|      b|       0.0|
|  3|    0|      c|       0.0|
|  4|    0|      d|     0.317|
|  5|    1|      a|       0.0|
|  6|    1|      b|       0.0|
|  7|    1|      c|     -0.78|
|  8|    1|      d|     -0.37|
|  9|    2|      a|       0.0|
| 10|    2|      b|       0.0|
| 11|    2|      c|       0.0|
| 12|    2|      d|       0.0|
+---+-----+-------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...