Преобразовать массив в DenseVector в Spark DataFrame с помощью Java - PullRequest
0 голосов
/ 22 октября 2018

Я использую Spark 2.3.Я хочу преобразовать столбец features в следующем фрейме данных из ArrayType в DenseVector.Я использую Spark в Java.

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[4.191401, -1.793...|
| 10|[-0.5674514, -1.3...|
| 20|[0.735613, -0.026...|
| 30|[-0.030161237, 0....|
| 40|[-0.038345724, -0...|
+---+--------------------+

root
 |-- id: integer (nullable = false)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = false)

Я написал следующее UDF, но, похоже, оно не работает:

private static UDF1 toVector = new UDF1<Float[], Vector>() {

    private static final long serialVersionUID = 1L;

    @Override
    public Vector call(Float[] t1) throws Exception {

        double[] DoubleArray = new double[t1.length];
        for (int i = 0 ; i < t1.length; i++)
        {
            DoubleArray[i] = (double) t1[i];
        }   
    Vector vector = (org.apache.spark.mllib.linalg.Vector) Vectors.dense(DoubleArray);
    return vector;
    }
}

Я хочу извлечь следующие функциикак вектор, чтобы я мог выполнить кластеризацию на нем.

Я также регистрирую UDF и затем продолжаю вызывать его следующим образом:

spark.udf().register("toVector", (UserDefinedAggregateFunction) toVector);
df3 = df3.withColumn("featuresnew", callUDF("toVector", df3.col("feautres")));
df3.show();  

При запуске этого фрагмента я сталкиваюсьследующая ошибка:

ReadProcessData $ 1 не может быть приведен к org.apache.spark.sql.expressions.UserDefinedAggregateFunction

1 Ответ

0 голосов
/ 23 октября 2018

Проблема заключается в том, как вы регистрируете udf в Spark.Вы не должны использовать UserDefinedAggregateFunction, который не udf, а udaf, используемый для агрегации.Вместо этого вы должны сделать следующее:

spark.udf().register("toVector", toVector, new VectorUDT());

Затем, чтобы использовать зарегистрированную функцию, используйте:

df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));

Сам udf должен быть слегка отрегулирован следующим образом:

UDF1 toVector = new UDF1<Seq<Float>, Vector>(){

  public Vector call(Seq<Float> t1) throws Exception {

    List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);
    double[] DoubleArray = new double[t1.length()]; 
    for (int i = 0 ; i < L.size(); i++) { 
      DoubleArray[i]=L.get(i); 
    } 
    return Vectors.dense(DoubleArray); 
  } 
};

Обратите внимание, что в Spark 2.3 + вы можете создать стиль scala udf, который можно вызывать напрямую.Из этого ответа :

UserDefinedFunction toVector = udf(
  (Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()
);

df3.withColumn("featuresnew", toVector.apply(col("feautres")));
...