Преобразование списка пользовательских объектов Java в набор данных дает исключение stackoverflow в Spark - PullRequest
0 голосов
/ 21 сентября 2019

Я пытаюсь преобразовать список пользовательских объектов Java в пользовательский типизированный набор данных.

JavaInputDStream<ConsumerRecord<String, String>> telemetryStream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.Subscribe(topics, kafkaParams)
                );

        JavaDStream<String> telemetryValueStream = telemetryStream.map(consumerRecord -> consumerRecord.value());

        telemetryValueStream.foreachRDD(eachPartition -> {
            if(!eachPartition.isEmpty()){
                List<Tuple> tupleList = new ArrayList<>();
                eachPartition.foreach(record -> {
                    ObjectMapper om = new ObjectMapper();
                    JsonNode dataNode = om.readTree(record);
                    Tuple tupled = new Tuple();
                    tupled.setKey(dataNode.at("/Telemetry/encoding_path").asText());
                    tupled.setValue(dataNode);
                    tupleList.add(tupled);
                });
                Encoder<Tuple> encoder = Encoders.bean(Tuple.class);
                Dataset<Tuple> tupledDS = JavaSparkSessionSingleton.getInstance(eachPartition.context().getConf()).createDataset(tupleList,encoder);
                tupledDS.printSchema();
            }
        });

Это приводит к исключению StackoverflowException

Exception in thread "streaming-job-executor-0" java.lang.StackOverflowError
    at sun.reflect.generics.reflectiveObjects.TypeVariableImpl.equals(TypeVariableImpl.java:189)
    at org.spark_project.guava.collect.SingletonImmutableBiMap.get(SingletonImmutableBiMap.java:53)
    at org.spark_project.guava.reflect.TypeResolver.resolveTypeVariable(TypeResolver.java:207)
    at org.spark_project.guava.reflect.TypeResolver.resolveTypeVariable(TypeResolver.java:197)
    at org.spark_project.guava.reflect.TypeResolver.resolveType(TypeResolver.java:157)
    at org.spark_project.guava.reflect.TypeResolver.resolveParameterizedType(TypeResolver.java:229)
    at org.spark_project.guava.reflect.TypeResolver.resolveType(TypeResolver.java:159)
    at org.spark_project.guava.reflect.TypeToken.resolveType(TypeToken.java:268)
    at org.spark_project.guava.reflect.TypeToken.resolveSupertype(TypeToken.java:279)
    at org.spark_project.guava.reflect.TypeToken.getSupertype(TypeToken.java:401)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.elementType(JavaTypeInference.scala:157)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)

Вот класс компонента для вашей справки

public class Tuple implements Serializable {

    private String key;

    private JsonNode value;

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public JsonNode getValue() {
        return value;
    }

    public void setValue(JsonNode value) {
        this.value = value;
    }

}
...