Я пытаюсь преобразовать JavaRDD в Dataframe с помощью функции createDataFrame (JavaRDD rdd, Class beanclass), но получаю следующую ошибку. Я использовал следующий пример, потому что он присутствует в Spark Repo и соответствует моему варианту использования.
Мой вариант использования:
Я пытаюсь преобразовать JavaRDD с сложный вложенный объект (с перечислениями и многоуровневой хэш-картой) в Dataframes, чтобы я мог записывать данные в формате ORC / Parquet (JavaRDD не поддерживает ORC / Parquet)
Входные данные в формате Avro, там является бесконечной проблемой рекурсии в createDataFrame для типов Avro, ссылаясь на это https://issues.apache.org/jira/browse/SPARK-25789, то есть сначала я загружаю данные в JavaRDD.
Импорт:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import java.io.Serializable;
Pojo
public enum Gender implements Serializable {
M,
F;
}
@Getter
@Setter
public class Person implements Serializable {
private Gender gender;
private String name;
private int age;
}
Создание RDD
JavaRDD<Person> peopleRDD = spark.read()
.textFile("people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setGender(Gender.valueOf(parts[0]));
person.setName(parts[1]);
person.setAge(Integer.parseInt(parts[2].trim()));
return person;
});
Создание DataFrame из RDD
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
System.out.println(peopleDF.takeAsList(2).get(0));
Ошибка:
java.lang.IllegalArgumentException: The value (M) of the type (com.xyz.JavaSparkSQLExample.Gender) cannot be converted to the string type
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1108)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
Я изменил пример, представленный в Spark Repo: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java