Цель
Цель, которую я хочу достичь, -
- прочитать файл CSV (ОК)
- закодировать его в
Dataset<Person>
, где Person
объект имеет вложенный объект Address[]
. (создает исключение)
CSV-файл Person
В файле с именем person.csv
имеются следующие данные, описывающиенекоторые лица:
name,age,address
"name1",10,"streetA~cityA||streetB~cityB"
"name2",20,"streetA~cityA||streetB~cityB"
Первая строка - это схема, а адрес - это вложенная структура .
Классы данных
Классы данных:
@Data
public class Address implements Serializable {
public String street;
public String city;
}
и
@Data
public class Person implements Serializable {
public String name;
public Integer age;
public Address[] address;
}
Чтение нетипизированных данных
Сначала я попытался прочитать данные изCSV в Dataset<Row>
, который работает как ожидалось:
Dataset<Row> ds = spark.read() //
.format("csv") //
.option("header", "true") // first line has headers
.load("src/test/resources/outer/person.csv");
LOG.info("=============== Print schema =============");
ds.printSchema();
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- address: string (nullable = true)
LOG.info("================ Print data ==============");
ds.show();
+-----+---+--------------------+
| name|age| address|
+-----+---+--------------------+
|name1| 10|streetA~cityA||st...|
|name2| 20|streetA~cityA||st...|
+-----+---+--------------------+
LOG.info("================ Print name ==============");
ds.select("name").show();
+-----+
| name|
+-----+
|name1|
|name2|
+-----+
assertThat(ds.isEmpty(), is(false)); //OK
assertThat(ds.count(), is(2L)); //OK
final List<String> names = ds.select("name").as(Encoders.STRING()).collectAsList();
assertThat(names, hasItems("name1", "name2")); //OK
Кодирование через UserDefinedFunction
Мой udf, который принимает String
и возвращает Address[]
:
private static void registerAsAddress(SparkSession spark) {
spark.udf().register("asAddress", new UDF1<String, Address[]>() {
@Override
public Address[] call(String rowValue) {
return Arrays.stream(rowValue.split(Pattern.quote("||"), -1)) //
.map(object -> object.split("~")) //
.map(Address::fromArgs) //
.map(a -> a.orElse(null)) //
.toArray(Address[]::new);
}
}, //
DataTypes.createArrayType(DataTypes.createStructType(
new StructField[]{new StructField("street", DataTypes.StringType, true, Metadata.empty()), //
new StructField("city", DataTypes.StringType, true, Metadata.empty()) //
})));
}
Звонящий:
@Test
void asAddressTest() throws URISyntaxException {
registerAsAddress(spark);
// given, when
Dataset<Row> ds = spark.read() //
.format("csv") //
.option("header", "true") // first line has headers
.load("src/test/resources/outer/person.csv");
ds.show();
// create a typed dataset
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> typed = ds.withColumn("address2", //
callUDF("asAddress", ds.col("address")))
.drop("address").withColumnRenamed("address2", "address")
.as(personEncoder);
LOG.info("Typed Address");
typed.show();
typed.printSchema();
}
Что приводит к такому исполнению:
Вызывается: java.lang.IllegalArgumentException: Значение (Адрес (улица = улица А, город = город А)) типа (ch.project.data.Address) не может быть преобразован в структуру
Почему он не может преобразовать из Address
в Struct
?