Как рекурсивно вызвать FlatMapFunction ? - PullRequest
2 голосов
/ 07 мая 2020

У меня есть Dataset df, читаем с помощью spark.read (). json

Его схема выглядит примерно так:

root
 |-- items: struct (nullable = true)
 |    |-- item: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- batch-id column: long (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)

Я хочу использовать FlatMapFunction, чтобы получить набор данных с внутренней схемой (идентификатор, имя, тип).

Я хочу сделать что-то вроде следующего:

df.flatMap(mapperFunction(),RowEncoder.apply(someSchema);

public static FlatMapFunction<Row,Row> mapperFunction() {
    return row -> {
      Row r1 = row.getAs("items");
      List<Row> r2 = r1.getList(0);    //This will explode the column
      StructType schema = r2.get(0).schema();
      //I know list doesn't have map function, I want to know what can be done here
      return r2.flatMap(mapperFunction(),RowEncoder.apply(schema);    
    };
  }

1 Ответ

0 голосов
/ 24 мая 2020

Есть несколько вариантов:

Вариант 1 : используйте explode

Самый простой способ сгладить структуру данных - использовать разнести вместо вызова flatMap:

Начиная с данных

{"items": {"item": [{"batch-id":1,"id":"id1","name":"name1","type":"type1"},{"batch-id":2,"id":"id2","name":"name2","type":"type2"}]}}

код

df.withColumn("exploded", explode(col("items.item"))).select("exploded.*").show();

печатает

+--------+---+-----+-----+
|batch_id| id| name| type|
+--------+---+-----+-----+
|       1|id1|name1|type1|
|       2|id2|name2|type2|
+--------+---+-----+-----+

Вариант 2 : используйте flatMap

Если требуется вызов flatMap (например, чтобы добавить больше logi c в сопоставление), этот код выводит тот же результат:

df.flatMap(mapperFunction(), Encoders.bean(Data.class)).show();

с функцией сопоставления

private static FlatMapFunction<Row, Data> mapperFunction() {
    return row -> {
        Row r1 = row.getAs("items");
        List<Row> r2 = r1.getList(0);    //This will explode the column
        return r2.stream().map(entry -> {
            Data d = new Data();
            d.setBatch_id(entry.getLong(0));
            d.setId(entry.getString(1));
            d.setName(entry.getString(2));
            d.setType(entry.getString(3));
            return d;
        }).iterator();
    };
}

и бином данных

public static class Data implements Serializable {
    private long batch_id;
    private String id;
    private String name;
    private String type;

    //getters and setters
}
...