В PySpark я пытаюсь определить пользовательский агрегатор , который накапливает состояние .Возможно ли это в Spark 2.3?
AFAIK, теперь возможно определить пользовательский UDAF в PySpark начиная с Spark 2.3 (см. Как определить и использовать пользовательскую функцию агрегирования в Spark SQL? ), позвонив pandas_udf
с ключевым словом PandasUDFType.GROUPED_AGG
.Однако, учитывая, что он просто принимает функцию в качестве параметра, я не думаю, что можно переносить состояние во время агрегации.
Из Scala я вижу, что возможно агрегирование с сохранением состояния с помощью любого расширения UserDefinedAggregateFunction
или org.apache.spark.sql.expressions.Aggregator
, но могу ли я сделать нечто подобное только на стороне Python?