Есть несколько вариантов:
Вариант 1 : используйте explode
Самый простой способ сгладить структуру данных - использовать разнести вместо вызова flatMap
:
Начиная с данных
{"items": {"item": [{"batch-id":1,"id":"id1","name":"name1","type":"type1"},{"batch-id":2,"id":"id2","name":"name2","type":"type2"}]}}
код
df.withColumn("exploded", explode(col("items.item"))).select("exploded.*").show();
печатает
+--------+---+-----+-----+
|batch_id| id| name| type|
+--------+---+-----+-----+
| 1|id1|name1|type1|
| 2|id2|name2|type2|
+--------+---+-----+-----+
Вариант 2 : используйте flatMap
Если требуется вызов flatMap (например, чтобы добавить больше logi c в сопоставление), этот код выводит тот же результат:
df.flatMap(mapperFunction(), Encoders.bean(Data.class)).show();
с функцией сопоставления
private static FlatMapFunction<Row, Data> mapperFunction() {
return row -> {
Row r1 = row.getAs("items");
List<Row> r2 = r1.getList(0); //This will explode the column
return r2.stream().map(entry -> {
Data d = new Data();
d.setBatch_id(entry.getLong(0));
d.setId(entry.getString(1));
d.setName(entry.getString(2));
d.setType(entry.getString(3));
return d;
}).iterator();
};
}
и бином данных
public static class Data implements Serializable {
private long batch_id;
private String id;
private String name;
private String type;
//getters and setters
}