Проблема заключается в том, как вы регистрируете udf
в Spark.Вы не должны использовать UserDefinedAggregateFunction
, который не udf
, а udaf
, используемый для агрегации.Вместо этого вы должны сделать следующее:
spark.udf().register("toVector", toVector, new VectorUDT());
Затем, чтобы использовать зарегистрированную функцию, используйте:
df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));
Сам udf
должен быть слегка отрегулирован следующим образом:
UDF1 toVector = new UDF1<Seq<Float>, Vector>(){
public Vector call(Seq<Float> t1) throws Exception {
List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);
double[] DoubleArray = new double[t1.length()];
for (int i = 0 ; i < L.size(); i++) {
DoubleArray[i]=L.get(i);
}
return Vectors.dense(DoubleArray);
}
};
Обратите внимание, что в Spark 2.3 + вы можете создать стиль scala udf
, который можно вызывать напрямую.Из этого ответа :
UserDefinedFunction toVector = udf(
(Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()
);
df3.withColumn("featuresnew", toVector.apply(col("feautres")));