[Набор данных Spark]: API CreateDataset завершается ошибкой с объектом Java, содержащим абстрактный класс - PullRequest
1 голос
/ 16 апреля 2020

Я пытаюсь преобразовать JavaRDD в набор данных с помощью функции createDataFrame(RDD<T> data, Encoder<T> evidence), но получаю следующую ошибку. Я использовал подмножество моего варианта использования.

Мой вариант использования: Я пытаюсь преобразовать JavaRDD со сложным вложенным объектом (с абстрактными классами) в набор данных, чтобы я мог написать данные в формате ORC / Parquet (JavaRDD не поддерживает ORC / Parquet)

Входные данные в формате Avro, существует бесконечная проблема рекурсии в createDataFrame для типов Avro, ссылаясь на этот https://issues.apache.org/jira/browse/SPARK-25789, поэтому я сначала загружаю данные в JavaRDD.

Требования : Encoders.kryo (), Encoders.javaSerialization () работает здесь, но я хочу использовать Encoders. bean ()

Encoders.bean (T) использует структуру объекта, чтобы обеспечить макет хранилища, определяемый классом c, так как я использую для хранения формат партера (столбчатое хранилище), каждая переменная класса может храниться в другом столбце, используя Encoders.bean (T), тогда как Encoders.Kryo (T) и EncodersjavaSerialization (T), эти кодировщики отображают T в одно массив байтов (двоичное). и, таким образом, сохраните объект в один столбец.

Если требуется специальный сериализатор, уточните решение.

Используемые классы:

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Map;

@AllArgsConstructor
@NoArgsConstructor
@lombok.Data
public class Data implements Serializable {
    private String id;
    private Map<Type, Aclass> data;
}


import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;

@AllArgsConstructor
@NoArgsConstructor
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "uad")
@JsonSubTypes(value = {@JsonSubTypes.Type(name = "UADAffinity", value = Bclass.class)})
public abstract class Aclass implements Serializable {
    String t;
}



import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;

@AllArgsConstructor
@NoArgsConstructor
@Data
public class Bclass extends Aclass {
    private Map<String, String> data;
    public Bclass(String t, Map<String, String> data) {
        super(t);
        this.data = data;
    }
}

public enum Type {
    A, B;
}

Logi c:

import com.flipkart.ads.neo.schema.Type;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xyz.schema.Aclass;
import com.xyz.schema.Bclass;
import com.xyz.schema.Data;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;

import java.util.List;
import java.util.Map;

public class DataSetConverter {
    private SparkSession session;

    public DataSetConverter() {
        session = initSpark();
    }

    SparkSession initSpark() {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("123");
        return SparkSession.builder()
                .sparkContext(new SparkContext(conf))
                .getOrCreate();
    }

    public void dataset_test() {
        List<Data> dataList = prepareData();
        JavaSparkContext jsc = new JavaSparkContext(session.sparkContext());
        JavaRDD<Data> rowsrdd = jsc.parallelize(dataList);
        Dataset<Data> rows = session.createDataset(rowsrdd.rdd(), Encoders.bean(Data.class));
        System.out.println(rows.takeAsList(3));
    }

    public static void main(String[] args) {
        new DataSetConverter().dataset_test();
    }

    private List<Data> prepareData() {
        List<Data> dataList = Lists.newArrayList();
        Data sample1 = getData();
        Data sample2 = getData();
        dataList.add(sample1);
        dataList.add(sample2);
        return dataList;
    }

    private Data getData() {
        Map<Type, Aclass> data = getUadData("ppv");
        return new Data("123", data);
    }

    private Map<Type, Aclass> getUadData(String id) {
        Map<Type, Aclass> result = Maps.newHashMap();
        Map<String, String> data = Maps.newHashMap();
        data.put(id + "11", "11");
        result.put(Type.A, new Bclass("123", data));
        return result;
    }
}

Ошибка:

org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 150, Column 11: Cannot instantiate abstract "com.xyz.schema.Aclass"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5260)
    at org.codehaus.janino.UnitCompiler.access$9800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitNewClassInstance(UnitCompiler.java:4433)
    at org.codehaus.janino.UnitCompiler$16.visitNewClassInstance(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$NewClassInstance.accept(Java.java:5179)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4703)
    at org.codehaus.janino.UnitCompiler.access$8800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4418)
    at org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$ConditionalExpression.accept(Java.java:4504)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580)
    at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
    at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981)
    at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414)
    at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)

Пожалуйста, помогите !!

...