Преобразовать строку во вложенный JSON в Spark - PullRequest
1 голос
/ 10 мая 2019

Я пытаюсь преобразовать строку во вложенную структуру JSON в Spark.

Строки загружаются из файла значений, разделенных запятыми:

identifier,timestamp,x,y
2,         456,      1,x
1,         456,      1,y
1,         123,      0,x
1,         789,      0,z

Строки должны быть преобразованы в следующий формат JSON (сгруппированы по идентификатору и отсортированы по отметке времени):

{"identifier":"1","events":[{"timestamp":"123","properties":{"x":"0","y":"x"}},{"timestamp":"456","properties":{"x":"1","y":"y"}},{"timestamp":"789","properties":{"x":"0","y":"z"}}]}

{"identifier":"2","events":[{"timestamp":"456","properties":{"x":"0","y":"z"}}]}

На данный момент мне удалось преобразовать данные в ...

{"identifier":"1","collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)":[{"timestamp":"123","properties":{"x":"0","y":"x"}},{"timestamp":"456","properties":{"x":"1","y":"y"}},{"timestamp":"789","properties":{"x":"0","y":"z"}}]}

{"identifier":"2","collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)":[{"timestamp":"456","properties":{"x":"0","y":"z"}}]}

используя следующий код:

public static void main(final String[] args) {
    final Column properties = struct(col("x").as("x"), col("y").as("y")).as("properties");
    final Column event = struct(col("timestamp").as("timestamp"), properties).as("events");

    final SparkSession sparkSession = SparkSession.builder().getOrCreate();
    final Dataset<Row> events = sparkSession.read().option("header", "true").csv("/input/events").sort(col("identifier").asc(), col("timestamp").asc());
    Dataset<String> groupedEvents = events.groupBy("identifier").agg(collect_list(event)).toJSON();
    groupedEvents.write().text("/output/events");
    sparkSession.stop();
}

Однако это результирующее преобразование также включает ...

"collect_list(named_struct(NamePlaceholder(), timestamp AS `timestamp`, NamePlaceholder(), named_struct(NamePlaceholder(), x AS `x`, NamePlaceholder(), y AS `y`) AS `properties`) AS `events`)

, который я хотел бы приравнять к «событиям».

Как произвести описанное преобразование?

1 Ответ

0 голосов
/ 11 мая 2019

Наконец-то справился сам, используя следующее:

public static void main(final String[] args) {
    final SparkSession sparkSession = SparkSession.builder().getOrCreate();
    final Dataset<Row> events = sparkSession.read().option("header", "true").csv("/input/events");
    events.createOrReplaceTempView("groupedevents");
    final Dataset<String> groupedEvents = sparkSession.sql("select identifier, sort_array(collect_list(struct(timestamp, struct(x, y) as properties))) as events from groupedevents group by identifier").toJSON();
    groupedEvents.write().text("/output/events");
    sparkSession.stop();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...