Искра неправильная сериализация вложенных структур на группу-уменьшить по ключу - PullRequest
0 голосов
/ 04 марта 2019

Я хочу уменьшить кадр данных по ключу.Логика сокращения довольно сложна и требует обновления примерно 10-15 полей.Вот почему я хочу преобразовать DataFrame в DataSet и уменьшить количество Java POJO.

Проблема

Проблема в том, что после groupByKey-reduceByKey я получил несколько очень странных значений.Encoders.bean(Entity.class) читает правильные данные. См. Раздел «Пример кода» .

Обходные пути

Замена Encoders.bean на Encoders.kryo не работает, исключение:

Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.

Также я видел этот обходной путь , но Encoders.product требует TypeTag.Я не знаю, как создать TypeTag в коде Java.

Пример кода

    Dataset<Entity> ds = createDataFrame("testData.json", "testSchema.json")
        .as(Encoders.bean(Entity.class));

    // shows correct numbers
    ds.show(10, false);

    // correct data, please pay attention to `storages` column values
+-----------+-----------+-----+-------------------------------+
|broker_name|server_name|order|storages                       |
+-----------+-----------+-----+-------------------------------+
|A1         |S1         |1    |[[2018-10-29 23:11:44, 12.5]]  |
|A2         |S1         |1    |[[2018-10-30 14:43:05, 13.2]]  |
|A3         |S1         |2    |[[2019-11-02 10:00:03, 1001.0]]|
+-----------+-----------+-----+-------------------------------+


    //after reduce shows wrong numbers
    ds
        .groupByKey(o -> new RowKey(o.getBroker_name(), o.getServer_name(), o.getOrder()), Encoders.bean(RowKey.class))
        .reduceGroups((e1, e2) -> e1)
        .map(tuple -> tuple._2, Encoders.bean(Entity.class))
        .show(10, false);

    // wrong values, please pay attention to `storages` column
+-----------+-----+-----------+---------------------------------------------------------+
|broker_name|order|server_name|storages                                                 |
+-----------+-----+-----------+---------------------------------------------------------+
|A1         |2    |S1         |[[7.77011509161492E-309, 149386-07-09 23:48:5454.211584]]|
|A1         |1    |S1         |[[7.61283374479283E-309, 148474-03-19 21:14:3232.5248]]  |
+-----------+-----+-----------+---------------------------------------------------------+

Entity.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
    private String broker_name;
    private String server_name;
    private Integer order;
    private Storage[] storages;
}

Storage.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
    private Timestamp timestamp;
    private Double storage;
}

testData.json:

[
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-29 23:11:44.000",
        "storage": 12.5
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-30 14:43:05.000",
        "storage": 13.2
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 2,
    "storages": [
      {
        "timestamp": "2019-11-02 10:00:03.000",
        "storage": 1001.0
      }
    ]
  }
]

testSchema.json:

{
  "type": "struct",
  "fields": [
    {
      "name": "broker_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "server_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "order",
      "type": "integer",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "storages",
      "type": {
        "type": "array",
        "elementType": {
          "type": "struct",
          "fields": [
            {
              "name": "timestamp",
              "type": "timestamp",
              "nullable": true,
              "metadata": {}
            },
            {
              "name": "storage",
              "type": "double",
              "nullable": true,
              "metadata": {}
            }
          ]
        },
        "containsNull": true
      },
      "nullable": true,
      "metadata": {}
    }
  ]
}

1 Ответ

0 голосов
/ 04 марта 2019

Это связано с тем, что десериализация использует структурное сопоставление для схемы, выведенной из Encoder, а классы бинов не имеют естественной структуры, поля схемы упорядочены по имени.

Так что если вы определяете класс бина, напримерваш Entity, схема, выведенная из компонента Encoder, будет

Encoders.bean(Storage.class).schema().printTreeString();
root
 |-- storage: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

не

root
 |-- timestamp: timestamp (nullable = true)
 |-- storage: double (nullable = true)

и это схема, которую следует использовать Dataset.Другими словами, схема, определенная как:

StructType schema = Encoders.bean(Entity.class).schema();

или

StructType schema = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<storage: double, timestamp: timestamp>>" 
);

, будет действительной и может использоваться для прямой загрузки testData:

Dataset<Entity> ds = spark.read()
  .option("multiline", "true")
  .schema(schema)
  .json("testData.json")
  .as(Encoders.bean(Entity.class));

в то время как ваша текущая схема, которая эквивалентна:

StructType valid = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<timestamp: timestamp, storage: double>>" 
);

- нет, несмотря на то, что она будет работать с читателем JSON, который (в отличие от Encoders) сопоставляет данные по имени.

Возможно, такое поведение следует указывать как ошибку - интуитивно не должно быть случая, когда Encoder сбрасывает данные, которые несовместимы с его собственнымилогика загрузки.

Связанный билет JIRA - SPARK-27050

...