Кодер Spark Bean для Java классов, расширяющих com.sun.jna. Структура - PullRequest
3 голосов
/ 06 марта 2020

Я запускаю некоторый нативный код из UDF Spark, используя JNA. В настоящее время я вручную сопоставляю их с Scala Case-Classes, которые должны быть возвращены из UDF (схема выводится автоматически). Это приводит к значительному дублированию кода, поскольку структуры данных практически идентичны.

Поэтому я объявил группу Java классов для сопоставления собственных структур данных (эти классы должны наследоваться от com.sun.jna.Structure), они выглядеть так:

public class MyStruct extends com.sun.jna.Structure {

  public double[] cp_O = new double[3];
  public int B_HN;
  public double B_Yo;
  public double B_DY;

  @Override
  protected List getFieldOrder() {
    return Arrays.asList("cp_O","B_HN","B_Yo","B_DY");
}

}

Далее, чтобы использовать org.apache.spark.sql.Encoders.bean, я добавил геттер и сеттер для этого класса (чтобы он был компонентом) и попытался:

implicit val encoder = Encoders.bean(classOf[MyStruct])

// dummy udf without native call, just return a Bean
val myUDF = udf(() => new MyStruct(),encoder.schema)

ss.range(1)
  .withColumn("result",myUDF())
  .show()

Это дает:

Исключение в потоке "main" java .lang.NullPointerException в org.spark_project.guava.reflect.TypeToken.method (TypeToken. java: 465) в орг. apache .spark. sql .catalyst.JavaTypeInference $$ anonfun $ 2.apply (JavaTypeInference. scala: 126) в орг. apache .spark. sql .catalyst.JavaTypeInference $$ anonfun $ 2.apply (JavaTypeInference. scala: 125) в scala .collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike. scala: 244) в scala .collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike. scala: 244) в scala .collection.IndexedSeqOptimized $ class.foreach (IndexedSeqOptimized. scala: 33) a t scala .collection.mutable.ArrayOps $ ofRef.foreach (ArrayOps. scala: 108) в scala .collection.TraversableLike $ class.map (TraversableLike. scala: 244) в scala .collection .mutable.ArrayOps $ ofRef.map (ArrayOps. scala: 108) в орг. apache .spark. sql .catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ катализатор $ JavaTypeInference $$ inferDataType ( JavaTypeInference. scala: 125) в орг. apache .spark. sql .catalyst.JavaTypeInference $ .inferDataType (JavaTypeInference. scala: 55) в орг. apache .spark. sql .catalyst .encoders.ExpressionEncoder $ .javaBean (ExpressionEncoder. scala: 89) в орг. apache .spark. sql .Encoders $ .bean (Кодировщики. scala: 142)

Если я удаляю extends Structure из моего класса, код отлично работает для spark, но не для JNA. Таким образом, очевидно, что Structure добавляет много полей к MyStruct, одно из них не поддерживается / не может быть отображено. Поскольку в любом случае эти поля мне не нужны, как я могу сформулировать свой код так, чтобы кодировались только «мои» поля в MyStruct, но не те, которые унаследованы от Structure?

...