Поддержка Spark SQL для вложенных массивов и бинов - PullRequest
0 голосов
/ 01 марта 2019

Каждый час я получаю некоторые обновления значений в качестве нового DataFrame.Я должен уменьшить DataFrames, чтобы дедуплицировать объекты и отслеживать историю обновлений значений.Поскольку логика редукции слишком сложна, я преобразую DataFrames в JavaRDD, сокращая и затем преобразуя JavaRDD обратно в DataFrame.

Проблема заключается в том, что мне нужно использовать вложенные структуры данных после сокращения.

Вопрос

Я прочитал , выводящий схему с помощью рефлексии ,но мне все еще не ясно:

Поддерживает ли Spark SQL только вложенные массивы примитивов или вложенные массивы bean-компонентов?

Почему код Case 1не работает, в то время как дело 2 работает?

дело 1

Из следующего кода, который я получил:

scala.MatchError: History (timestamp = =1970-01-01 00: 00: 00.0, значение = 10.0) (класса com.somepackage.History)

Поэтому я могу заключить, что Spark не поддерживает вложенный массив bean-компонентов. Но см. Случай 2.

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Entity implements Serializable {

    private Integer id;
    private History[] history;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class History implements Serializable {

    private Timestamp timestamp;
    private Double value;
}

JavaRDD<Entity> rdd = JavaSparkContext
    .fromSparkContext(spark().sparkContext())
    .parallelize(asList(
        new Entity(1, new History[] {
           new History(new Timestamp(0L), 10.0)
        })
    ));
spark()
    //EXCEPTION HERE!
    .createDataFrame(rdd, Entity.class)
    .show();

Случай 2

С другой стороны, следующий код работает правильно с вложенными массивами bean-компонентов:

Dataset<Entity> dataSet = spark()
    .read()
    .option("multiLine", true).option("mode", "PERMISSIVE")
    .schema(fromJson("/data/testSchema.json"))
    .json(getAbsoluteFilePath("data/testData.json"))
    .as(Encoders.bean(Entity.class));

JavaRDD<Entity> rdd = dataSet
    .toJavaRDD()
    .mapToPair(o -> tuple(RowFactory.create(o.getId()), o))
    .reduceByKey((o1, o2) -> o2)
    .values()
    .saveAsTextFile("output.json");

-------

private String getAbsoluteFilePath(String relativePath) {
    return this
        .getClass()
        .getClassLoader()
        .getResource("")
        .getPath() + relativePath;
}

private StructType fromJson(String pathToSchema) {
    return (StructType) StructType.fromJson(
        new BufferedReader(
            new InputStreamReader(
                Resources.class.getResourceAsStream(pathToSchema)
            )
        )
            .lines()
            .collect(Collectors.joining(System.lineSeparator()))
    );
}

testData.json

[
  {
    "id": 1,
    "history": [
      {
        "timestamp": "2018-10-29 23:11:44.000",
        "value": 12.5
      }
    ]
  },
  {
    "id": 1,
    "history": [
      {
        "timestamp": "2018-10-30 14:43:05.000",
        "value": 13.2
      }
    ]
  }
]

testSchema.json

{
  "type": "struct",
  "fields": [
    {
      "name": "id",
      "type": "integer",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "history",
      "type": {
        "type": "array",
        "elementType": {
          "type": "struct",
          "fields": [
            {
              "name": "timestamp",
              "type": "timestamp",
              "nullable": true,
              "metadata": {}
            },
            {
              "name": "value",
              "type": "double",
              "nullable": true,
              "metadata": {}
            }
          ]
        },
        "containsNull": true
      },
      "nullable": true,
      "metadata": {}
    }
  ]
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...