Я запускаю некоторый нативный код из UDF Spark, используя JNA. В настоящее время я вручную сопоставляю их с Scala Case-Classes, которые должны быть возвращены из UDF (схема выводится автоматически). Это приводит к значительному дублированию кода, поскольку структуры данных практически идентичны.
Поэтому я объявил группу Java классов для сопоставления собственных структур данных (эти классы должны наследоваться от com.sun.jna.Structure
), они выглядеть так:
public class MyStruct extends com.sun.jna.Structure {
public double[] cp_O = new double[3];
public int B_HN;
public double B_Yo;
public double B_DY;
@Override
protected List getFieldOrder() {
return Arrays.asList("cp_O","B_HN","B_Yo","B_DY");
}
}
Далее, чтобы использовать org.apache.spark.sql.Encoders.bean
, я добавил геттер и сеттер для этого класса (чтобы он был компонентом) и попытался:
implicit val encoder = Encoders.bean(classOf[MyStruct])
// dummy udf without native call, just return a Bean
val myUDF = udf(() => new MyStruct(),encoder.schema)
ss.range(1)
.withColumn("result",myUDF())
.show()
Это дает:
Исключение в потоке "main" java .lang.NullPointerException в org.spark_project.guava.reflect.TypeToken.method (TypeToken. java: 465) в орг. apache .spark. sql .catalyst.JavaTypeInference $$ anonfun $ 2.apply (JavaTypeInference. scala: 126) в орг. apache .spark. sql .catalyst.JavaTypeInference $$ anonfun $ 2.apply (JavaTypeInference. scala: 125) в scala .collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike. scala: 244) в scala .collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike. scala: 244) в scala .collection.IndexedSeqOptimized $ class.foreach (IndexedSeqOptimized. scala: 33) a t scala .collection.mutable.ArrayOps $ ofRef.foreach (ArrayOps. scala: 108) в scala .collection.TraversableLike $ class.map (TraversableLike. scala: 244) в scala .collection .mutable.ArrayOps $ ofRef.map (ArrayOps. scala: 108) в орг. apache .spark. sql .catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ катализатор $ JavaTypeInference $$ inferDataType ( JavaTypeInference. scala: 125) в орг. apache .spark. sql .catalyst.JavaTypeInference $ .inferDataType (JavaTypeInference. scala: 55) в орг. apache .spark. sql .catalyst .encoders.ExpressionEncoder $ .javaBean (ExpressionEncoder. scala: 89) в орг. apache .spark. sql .Encoders $ .bean (Кодировщики. scala: 142)
Если я удаляю extends Structure
из моего класса, код отлично работает для spark, но не для JNA. Таким образом, очевидно, что Structure
добавляет много полей к MyStruct
, одно из них не поддерживается / не может быть отображено. Поскольку в любом случае эти поля мне не нужны, как я могу сформулировать свой код так, чтобы кодировались только «мои» поля в MyStruct
, но не те, которые унаследованы от Structure
?