Каждый час я получаю некоторые обновления значений в качестве нового DataFrame.Я должен уменьшить DataFrames, чтобы дедуплицировать объекты и отслеживать историю обновлений значений.Поскольку логика редукции слишком сложна, я преобразую DataFrames в JavaRDD, сокращая и затем преобразуя JavaRDD обратно в DataFrame.
Проблема заключается в том, что мне нужно использовать вложенные структуры данных после сокращения.
Вопрос
Я прочитал , выводящий схему с помощью рефлексии ,но мне все еще не ясно:
Поддерживает ли Spark SQL только вложенные массивы примитивов или вложенные массивы bean-компонентов?
Почему код Case 1не работает, в то время как дело 2 работает?
дело 1
Из следующего кода, который я получил:
scala.MatchError: History (timestamp = =1970-01-01 00: 00: 00.0, значение = 10.0) (класса com.somepackage.History)
Поэтому я могу заключить, что Spark не поддерживает вложенный массив bean-компонентов. Но см. Случай 2.
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Entity implements Serializable {
private Integer id;
private History[] history;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class History implements Serializable {
private Timestamp timestamp;
private Double value;
}
JavaRDD<Entity> rdd = JavaSparkContext
.fromSparkContext(spark().sparkContext())
.parallelize(asList(
new Entity(1, new History[] {
new History(new Timestamp(0L), 10.0)
})
));
spark()
//EXCEPTION HERE!
.createDataFrame(rdd, Entity.class)
.show();
Случай 2
С другой стороны, следующий код работает правильно с вложенными массивами bean-компонентов:
Dataset<Entity> dataSet = spark()
.read()
.option("multiLine", true).option("mode", "PERMISSIVE")
.schema(fromJson("/data/testSchema.json"))
.json(getAbsoluteFilePath("data/testData.json"))
.as(Encoders.bean(Entity.class));
JavaRDD<Entity> rdd = dataSet
.toJavaRDD()
.mapToPair(o -> tuple(RowFactory.create(o.getId()), o))
.reduceByKey((o1, o2) -> o2)
.values()
.saveAsTextFile("output.json");
-------
private String getAbsoluteFilePath(String relativePath) {
return this
.getClass()
.getClassLoader()
.getResource("")
.getPath() + relativePath;
}
private StructType fromJson(String pathToSchema) {
return (StructType) StructType.fromJson(
new BufferedReader(
new InputStreamReader(
Resources.class.getResourceAsStream(pathToSchema)
)
)
.lines()
.collect(Collectors.joining(System.lineSeparator()))
);
}
testData.json
[
{
"id": 1,
"history": [
{
"timestamp": "2018-10-29 23:11:44.000",
"value": 12.5
}
]
},
{
"id": 1,
"history": [
{
"timestamp": "2018-10-30 14:43:05.000",
"value": 13.2
}
]
}
]
testSchema.json
{
"type": "struct",
"fields": [
{
"name": "id",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "history",
"type": {
"type": "array",
"elementType": {
"type": "struct",
"fields": [
{
"name": "timestamp",
"type": "timestamp",
"nullable": true,
"metadata": {}
},
{
"name": "value",
"type": "double",
"nullable": true,
"metadata": {}
}
]
},
"containsNull": true
},
"nullable": true,
"metadata": {}
}
]
}