У меня похожая проблема, как описано здесь , но почему-то я не могу заставить ее работать.
Один из моих столбцов в моем фрейме данных имеет вложенный массив с массивами, содержащими 18 Structкаждый.(см. пример ниже) Мне нужно суммировать вложенные области, чтобы у меня в массиве было 18 структур.
Мой фрейм данных (замените 0 любым числом):
root
...
|-- yearAgg: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: struct (containsNull = true)
| | | |-- year: integer (nullable = false)
| | | |-- area_loss: double (nullable = false)
| | | |-- biomass_loss: double (nullable = false)
| | | |-- carbon_emissions: double (nullable = false)
| | | |-- mangrove_biomass_loss: double (nullable = false)
| | | |-- mangrove_carbon_emissions: double (nullable = false)
+--------------------------------------------------------------------------------------------------------------------------------------------------+
yearAgg
+--------------------------------------------------------------------------------------------------------------------------------------------------+
[ [[1, 0, 0, 0, 0, 0],[2, 0, 0, 0, 0, 0], ... ,[18, 0, 0, 0, 0, 0]], [[...]], ...]
[ [[1, 0, 0, 0, 0, 0],[2, 0, 0, 0, 0, 0], ... ,[18, 0, 0, 0, 0, 0]], [[...]], ...]
Я хочу получить(замените 0 на любое число), значения 1 - 18 должны остаться неизменными, и должны быть суммированы только значения Struct с одинаковым идентификатором:
root
|-- Sum: array (nullable = false)
| |-- element: struct (containsNull = true)
| | |-- year: integer (nullable = false)
| | |-- area_loss: double (nullable = false)
| | |-- biomass_loss: double (nullable = false)
| | |-- carbon_emissions: double (nullable = false)
| | |-- mangrove_biomass_loss: double (nullable = false)
| | |-- mangrove_carbon_emissions: double (nullable = false)
+-------------------------------------------------------- -------+
Sum
+---------------------------------------------------------------+
[[1, 0, 0, 0, 0, 0],[2, 0, 0, 0, 0, 0], ... ,[18, 0, 0, 0, 0, 0]]
В качестве первой попытки я попытался это сделать:
spark.sql("""
| SELECT
| *,
| AGGREGATE(yearAgg, empty,
| (acc, year_data) -> zip_with(acc, year_data, (x, y) ->
| struct(x.year as year,
| x.area_loss + y.area_loss as area_loss,
| x.biomass_loss + y.biomass_loss as biomass_loss,
| x.carbon_emissions + y.carbon_emissions as carbon_emissions,
| x.mangrove_biomass_loss + y.mangrove_biomass_loss as mangrove_biomass_loss,
| x.mangrove_carbon_emissions + y.mangrove_carbon_emissions as mangrove_carbon_emissions)) )
| ) FROM yearAggs""".stripMargin)
ошибка, которую я получаю:
cannot resolve 'aggregate ... due to data type mismatch: argument 3 requires array<struct<year:int,area_loss:double,biomass_loss:double,carbon_emissions:double,mangrove_biomass_loss:double,mangrove_carbon_emissions:double>> type, however,'lambdafunction(zip_with ... is of array<struct<year:int,area_loss:double,biomass_loss:double,carbon_emissions:double,mangrove_biomass_loss:double,mangrove_carbon_emissions:double>> type.
Я не совсем уверен, как еще подойти к этому.Может кто-нибудь помочь?