Я хочу уменьшить кадр данных по ключу.Логика сокращения довольно сложна и требует обновления примерно 10-15 полей.Вот почему я хочу преобразовать DataFrame в DataSet и уменьшить количество Java POJO.
Проблема
Проблема в том, что после groupByKey-reduceByKey
я получил несколько очень странных значений.Encoders.bean(Entity.class)
читает правильные данные. См. Раздел «Пример кода» .
Обходные пути
Замена Encoders.bean
на Encoders.kryo
не работает, исключение:
Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.
Также я видел этот обходной путь , но Encoders.product
требует TypeTag
.Я не знаю, как создать TypeTag
в коде Java.
Пример кода
Dataset<Entity> ds = createDataFrame("testData.json", "testSchema.json")
.as(Encoders.bean(Entity.class));
// shows correct numbers
ds.show(10, false);
// correct data, please pay attention to `storages` column values
+-----------+-----------+-----+-------------------------------+
|broker_name|server_name|order|storages |
+-----------+-----------+-----+-------------------------------+
|A1 |S1 |1 |[[2018-10-29 23:11:44, 12.5]] |
|A2 |S1 |1 |[[2018-10-30 14:43:05, 13.2]] |
|A3 |S1 |2 |[[2019-11-02 10:00:03, 1001.0]]|
+-----------+-----------+-----+-------------------------------+
//after reduce shows wrong numbers
ds
.groupByKey(o -> new RowKey(o.getBroker_name(), o.getServer_name(), o.getOrder()), Encoders.bean(RowKey.class))
.reduceGroups((e1, e2) -> e1)
.map(tuple -> tuple._2, Encoders.bean(Entity.class))
.show(10, false);
// wrong values, please pay attention to `storages` column
+-----------+-----+-----------+---------------------------------------------------------+
|broker_name|order|server_name|storages |
+-----------+-----+-----------+---------------------------------------------------------+
|A1 |2 |S1 |[[7.77011509161492E-309, 149386-07-09 23:48:5454.211584]]|
|A1 |1 |S1 |[[7.61283374479283E-309, 148474-03-19 21:14:3232.5248]] |
+-----------+-----+-----------+---------------------------------------------------------+
Entity.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
private String broker_name;
private String server_name;
private Integer order;
private Storage[] storages;
}
Storage.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
private Timestamp timestamp;
private Double storage;
}
testData.json:
[
{
"broker_name": "A1",
"server_name": "S1",
"order": 1,
"storages": [
{
"timestamp": "2018-10-29 23:11:44.000",
"storage": 12.5
}
]
},
{
"broker_name": "A1",
"server_name": "S1",
"order": 1,
"storages": [
{
"timestamp": "2018-10-30 14:43:05.000",
"storage": 13.2
}
]
},
{
"broker_name": "A1",
"server_name": "S1",
"order": 2,
"storages": [
{
"timestamp": "2019-11-02 10:00:03.000",
"storage": 1001.0
}
]
}
]
testSchema.json:
{
"type": "struct",
"fields": [
{
"name": "broker_name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "server_name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "order",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "storages",
"type": {
"type": "array",
"elementType": {
"type": "struct",
"fields": [
{
"name": "timestamp",
"type": "timestamp",
"nullable": true,
"metadata": {}
},
{
"name": "storage",
"type": "double",
"nullable": true,
"metadata": {}
}
]
},
"containsNull": true
},
"nullable": true,
"metadata": {}
}
]
}