Я пытаюсь преобразовать JavaRDD в набор данных с помощью функции createDataFrame(RDD<T> data, Encoder<T> evidence)
, но получаю следующую ошибку. Я использовал подмножество моего варианта использования.
Мой вариант использования: Я пытаюсь преобразовать JavaRDD со сложным вложенным объектом (с абстрактными классами) в набор данных, чтобы я мог написать данные в формате ORC / Parquet (JavaRDD не поддерживает ORC / Parquet)
Входные данные в формате Avro, существует бесконечная проблема рекурсии в createDataFrame для типов Avro, ссылаясь на этот https://issues.apache.org/jira/browse/SPARK-25789, поэтому я сначала загружаю данные в JavaRDD.
Требования : Encoders.kryo (), Encoders.javaSerialization () работает здесь, но я хочу использовать Encoders. bean ()
Encoders.bean (T) использует структуру объекта, чтобы обеспечить макет хранилища, определяемый классом c, так как я использую для хранения формат партера (столбчатое хранилище), каждая переменная класса может храниться в другом столбце, используя Encoders.bean (T), тогда как Encoders.Kryo (T) и EncodersjavaSerialization (T), эти кодировщики отображают T в одно массив байтов (двоичное). и, таким образом, сохраните объект в один столбец.
Если требуется специальный сериализатор, уточните решение.
Используемые классы:
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Map;
@AllArgsConstructor
@NoArgsConstructor
@lombok.Data
public class Data implements Serializable {
private String id;
private Map<Type, Aclass> data;
}
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@AllArgsConstructor
@NoArgsConstructor
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "uad")
@JsonSubTypes(value = {@JsonSubTypes.Type(name = "UADAffinity", value = Bclass.class)})
public abstract class Aclass implements Serializable {
String t;
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Bclass extends Aclass {
private Map<String, String> data;
public Bclass(String t, Map<String, String> data) {
super(t);
this.data = data;
}
}
public enum Type {
A, B;
}
Logi c:
import com.flipkart.ads.neo.schema.Type;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xyz.schema.Aclass;
import com.xyz.schema.Bclass;
import com.xyz.schema.Data;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import java.util.List;
import java.util.Map;
public class DataSetConverter {
private SparkSession session;
public DataSetConverter() {
session = initSpark();
}
SparkSession initSpark() {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("123");
return SparkSession.builder()
.sparkContext(new SparkContext(conf))
.getOrCreate();
}
public void dataset_test() {
List<Data> dataList = prepareData();
JavaSparkContext jsc = new JavaSparkContext(session.sparkContext());
JavaRDD<Data> rowsrdd = jsc.parallelize(dataList);
Dataset<Data> rows = session.createDataset(rowsrdd.rdd(), Encoders.bean(Data.class));
System.out.println(rows.takeAsList(3));
}
public static void main(String[] args) {
new DataSetConverter().dataset_test();
}
private List<Data> prepareData() {
List<Data> dataList = Lists.newArrayList();
Data sample1 = getData();
Data sample2 = getData();
dataList.add(sample1);
dataList.add(sample2);
return dataList;
}
private Data getData() {
Map<Type, Aclass> data = getUadData("ppv");
return new Data("123", data);
}
private Map<Type, Aclass> getUadData(String id) {
Map<Type, Aclass> result = Maps.newHashMap();
Map<String, String> data = Maps.newHashMap();
data.put(id + "11", "11");
result.put(Type.A, new Bclass("123", data));
return result;
}
}
Ошибка:
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 150, Column 11: Cannot instantiate abstract "com.xyz.schema.Aclass"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5260)
at org.codehaus.janino.UnitCompiler.access$9800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitNewClassInstance(UnitCompiler.java:4433)
at org.codehaus.janino.UnitCompiler$16.visitNewClassInstance(UnitCompiler.java:4394)
at org.codehaus.janino.Java$NewClassInstance.accept(Java.java:5179)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4703)
at org.codehaus.janino.UnitCompiler.access$8800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4418)
at org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4394)
at org.codehaus.janino.Java$ConditionalExpression.accept(Java.java:4504)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580)
at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981)
at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414)
at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406)
at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
Пожалуйста, помогите !!