С искрой 2.4 появилось много функций более высокого порядка для массивов.(см. https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html)
val dataframe = Seq(
("a", 1, "xxx", "1", 1),
("a", 2, "yyy", "223", 0),
("a", 3, "zzz", "345", 1)
).toDF( "grouping_key", "id" , "someProperty" , "someOtherProperty", "propertyToFilterOn" )
.groupBy("grouping_key")
.agg(collect_list(struct("id" , "someProperty" , "someOtherProperty", "propertyToFilterOn")).as("your_array"))
dataframe.select("your_array").show(false)
+----------------------------------------------------+
|your_array |
+----------------------------------------------------+
|[[1, xxx, 1, 1], [2, yyy, 223, 0], [3, zzz, 345, 1]]|
+----------------------------------------------------+
Вы можете фильтровать элементы в массиве, используя функцию фильтра массива высшего порядка, например:
val filteredDataframe = dataframe.select(expr("filter(your_array, your_struct -> your_struct.propertyToFilterOn == 1)").as("filtered_arrays"))
filteredDataframe.show(false)
+----------------------------------+
|filtered_arrays |
+----------------------------------+
|[[1, xxx, 1, 1], [3, zzz, 345, 1]]|
+----------------------------------+
для «другой логики» вашегоговоря о том, что вы должны иметь возможность использовать функцию массива преобразования более высокого порядка следующим образом:
val tranformedDataframe = filteredDataframe
.select(expr("transform(filtered_arrays, your_struct -> struct(concat(your_struct.someProperty, '_', your_struct.someOtherProperty))"))
, но есть проблемы с возвратом структур из функции преобразования, как описано в этом посте:
http://mail-archives.apache.org/mod_mbox/spark-user/201811.mbox/%3CCALZs8eBgWqntiPGU8N=ENW2Qvu8XJMhnViKy-225ktW+_c0czA@mail.gmail.com%3E
*1017* * *лучше всего использовать API набора данных для преобразования следующим образом:
case class YourStruct(id:String, someProperty: String, someOtherProperty: String)
case class YourArray(filtered_arrays: Seq[YourStruct])
case class YourNewStruct(id:String, newProperty: String)
val transformedDataset = filteredDataframe.as[YourArray].map(_.filtered_arrays.map(ys => YourNewStruct(ys.id, ys.someProperty + "_" + ys.someOtherProperty)))
val transformedDataset.show(false)
+--------------------------+
|value |
+--------------------------+
|[[1, xxx_1], [3, zzz_345]]|
+--------------------------+