объект столбца scala в один столбец данных - PullRequest
1 голос
/ 25 сентября 2019

У меня есть такой фрейм данных:

      val df = Seq(
      ("a", Seq(2.0)),
      ("a", Seq(1.0)),
      ("a", Seq(0.5)),
      ("b", Seq(24.0)),
      ("b", Seq(12.5)),
      ("b", Seq(6.4)),
      ("b", Seq(3.2)),
      ("c", Seq(104.0)),
      ("c", Seq(107.4))
    ).toDF("key", "value")

Мне нужно использовать алгоритм, который принимает входные данные объекта DataFrame в разных группах.Чтобы сделать это более понятным, предположим, что я должен использовать StandardScaler для масштабирования по группам.

В пандах я хотел бы сделать что-то подобное (много типов изменений в процессе):

from sklearn.preprocessing import StandardScaler
       df.groupby(key) \
       .value \
       .transform(lambda x: StandardScaler \
       .fit_transform(x \
       .values \
       .reshape(-1,1)) \
       .reshape(-1))

Мне нужносделать это в Scala, потому что алгоритм, который мне нужен, это не Scaler, а еще одна вещь, встроенная в Scala.

До сих пор я пытался сделать что-то вроде этого:

import org.apache.spark.ml.feature.StandardScaler
def f(X : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = {  
      val scaler = new StandardScaler()
        .setInputCol("value")
        .setOutputCol("scaled")

      val output = scaler.fit(X)("scaled")

      (output)

    }

    df.withColumn("scaled_values", f(col("features")).over(Window.partitionBy("key")))

но, конечно, это дает мне ошибку:

command-144174313464261: 21: ошибка: несоответствие типов;найдено: org.apache.spark.sql.Column требуется: org.apache.spark.sql.Dataset [_] val output = scaler.fit (X) ("масштабированный")

Итак, япытаюсь превратить один объект Column в объект DataFrame, но безуспешно.Как мне это сделать?

Если это невозможно, есть ли способ решить эту проблему?

ОБНОВЛЕНИЕ 1

Кажется, я сделал некоторыеошибки в коде, я пытался это исправить (я думаю, что сделал правильно):

      val df = Seq(
      ("a", 2.0),
      ("a", 1.0),
      ("a", 0.5),
      ("b", 24.0),
      ("b", 12.5),
      ("b", 6.4),
      ("b", 3.2),
      ("c", 104.0),
      ("c", 107.4)
    ).toDF("key", "value")


    def f(X : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.Column = {  
         val assembler = new VectorAssembler()
        .setInputCols(Array("value"))
        .setOutputCol("feature")
          val scaler = new StandardScaler()
        .setInputCol("feature")
        .setOutputCol("scaled")
         val pipeline = new Pipeline()
        .setStages(Array(assembler, scaler))
         val output = pipeline.fit(X).transform(X)("scaled")

      (output)
    }  

    someDF.withColumn("scaled_values", f(someDF).over(Window.partitionBy("key")))

Я все еще получаю ошибку:

org.apache.spark.sql.AnalysisException: выражение 'scaled # 1294' не поддерживается внутри оконной функции. ;;

Я не уверен в причине этой ошибки, я попытался наложить псевдоним на столбец, но он не работает.

1 Ответ

2 голосов
/ 25 сентября 2019

Итак, я пытаюсь преобразовать один объект Column в объект DataFrame, но безуспешно.Как мне это сделать?

Вы не можете, Column просто ссылается на Column DataFrame, он не содержит никаких данных, это не структура данных, как DataFrame.

Ваша функция f также не будет работать так.Если вы не хотите создавать пользовательскую функцию для использования с Window, то вам нужен UDAF (User-Defined-Aggregation-Function), что довольно сложно ...

В вашем случае,Я хотел бы в groupBy key, собирать список ваших значений, а затем применить UDF для масштабирования.Обратите внимание, что это работает, только если данные на ключ не слишком велики (больше, чем у 1 исполнителя), в противном случае вам понадобится UDAF

Вот пример:

// example scala method, scale to 0-1
def myScaler(data:Seq[Double]) = {
  val mi = data.min
  val ma = data.max
  data.map(x => (x-mi)/(ma-mi))
}

val udf_myScaler = udf(myScaler _)

df
  .groupBy($"key")
  .agg(
    collect_list($"value").as("values")
  )
  .select($"key",explode(arrays_zip($"values",udf_myScaler($"values"))))
  .select($"key",$"col.values",$"col.1".as("values_scaled"))
  .show()

дает:

+---+------+-------------------+
|key|values|      values_scaled|
+---+------+-------------------+
|  c| 104.0|                0.0|
|  c| 107.4|                1.0|
|  b|  24.0|                1.0|
|  b|  12.5|0.44711538461538464|
|  b|   6.4|0.15384615384615385|
|  b|   3.2|                0.0|
|  a|   2.0|                1.0|
|  a|   1.0| 0.3333333333333333|
|  a|   0.5|                0.0|
+---+------+-------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...