Ошибка при преобразовании JavaRDD в Dataframe - PullRequest
1 голос
/ 12 апреля 2020

Я пытаюсь преобразовать JavaRDD в Dataframe с помощью функции createDataFrame (JavaRDD rdd, Class beanclass), но получаю следующую ошибку. Я использовал следующий пример, потому что он присутствует в Spark Repo и соответствует моему варианту использования.

Мой вариант использования:

Я пытаюсь преобразовать JavaRDD с сложный вложенный объект (с перечислениями и многоуровневой хэш-картой) в Dataframes, чтобы я мог записывать данные в формате ORC / Parquet (JavaRDD не поддерживает ORC / Parquet)

Входные данные в формате Avro, там является бесконечной проблемой рекурсии в createDataFrame для типов Avro, ссылаясь на это https://issues.apache.org/jira/browse/SPARK-25789, то есть сначала я загружаю данные в JavaRDD.


Импорт:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import java.io.Serializable;

Pojo

public enum Gender implements Serializable {
        M,
        F;
    }

    @Getter
    @Setter
    public class Person implements Serializable {
        private Gender gender;
        private String name;
        private int age;
    }

Создание RDD

 JavaRDD<Person> peopleRDD = spark.read()
                .textFile("people.txt")
                .javaRDD()
                .map(line -> {
                    String[] parts = line.split(",");
                    Person person = new Person();
                    person.setGender(Gender.valueOf(parts[0]));
                    person.setName(parts[1]);
                    person.setAge(Integer.parseInt(parts[2].trim()));
                    return person;
                });

Создание DataFrame из RDD

        // Apply a schema to an RDD of JavaBeans to get a DataFrame
        Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
        // Register the DataFrame as a temporary view
        System.out.println(peopleDF.takeAsList(2).get(0));

Ошибка:

java.lang.IllegalArgumentException: The value (M) of the type (com.xyz.JavaSparkSQLExample.Gender) cannot be converted to the string type
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1108)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)

Я изменил пример, представленный в Spark Repo: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java

...