Я пытаюсь преобразовать список пользовательских объектов Java в пользовательский типизированный набор данных.
JavaInputDStream<ConsumerRecord<String, String>> telemetryStream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
);
JavaDStream<String> telemetryValueStream = telemetryStream.map(consumerRecord -> consumerRecord.value());
telemetryValueStream.foreachRDD(eachPartition -> {
if(!eachPartition.isEmpty()){
List<Tuple> tupleList = new ArrayList<>();
eachPartition.foreach(record -> {
ObjectMapper om = new ObjectMapper();
JsonNode dataNode = om.readTree(record);
Tuple tupled = new Tuple();
tupled.setKey(dataNode.at("/Telemetry/encoding_path").asText());
tupled.setValue(dataNode);
tupleList.add(tupled);
});
Encoder<Tuple> encoder = Encoders.bean(Tuple.class);
Dataset<Tuple> tupledDS = JavaSparkSessionSingleton.getInstance(eachPartition.context().getConf()).createDataset(tupleList,encoder);
tupledDS.printSchema();
}
});
Это приводит к исключению StackoverflowException
Exception in thread "streaming-job-executor-0" java.lang.StackOverflowError
at sun.reflect.generics.reflectiveObjects.TypeVariableImpl.equals(TypeVariableImpl.java:189)
at org.spark_project.guava.collect.SingletonImmutableBiMap.get(SingletonImmutableBiMap.java:53)
at org.spark_project.guava.reflect.TypeResolver.resolveTypeVariable(TypeResolver.java:207)
at org.spark_project.guava.reflect.TypeResolver.resolveTypeVariable(TypeResolver.java:197)
at org.spark_project.guava.reflect.TypeResolver.resolveType(TypeResolver.java:157)
at org.spark_project.guava.reflect.TypeResolver.resolveParameterizedType(TypeResolver.java:229)
at org.spark_project.guava.reflect.TypeResolver.resolveType(TypeResolver.java:159)
at org.spark_project.guava.reflect.TypeToken.resolveType(TypeToken.java:268)
at org.spark_project.guava.reflect.TypeToken.resolveSupertype(TypeToken.java:279)
at org.spark_project.guava.reflect.TypeToken.getSupertype(TypeToken.java:401)
at org.apache.spark.sql.catalyst.JavaTypeInference$.elementType(JavaTypeInference.scala:157)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
Вот класс компонента для вашей справки
public class Tuple implements Serializable {
private String key;
private JsonNode value;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public JsonNode getValue() {
return value;
}
public void setValue(JsonNode value) {
this.value = value;
}
}