Прежде всего, ваш код Панд неверен. Это просто не будет работать, Spark или нет
pdf.apply(lambda x: x['count'].ewm(span=5, min_periods=1).mean())
Другая проблема - это схема вывода, которая в зависимости от ваших данных не будет в действительности соответствовать результату:
- Если хотите добавить ewm, схема должна быть расширена.
- Если вы хотите вернуть только ewm, тогда схема слишком большая.
- Если вы хотите просто заменить, он может не соответствовать типу.
Давайте предположим, что это первый сценарий (я позволил себе немного переписать ваш код):
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import DoubleType, StructField
def exp_ma(df, group_col='Name', sort_col='Date'):
schema = (df.select(group_col, sort_col, 'count')
.schema.add(StructField('ewma', DoubleType())))
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def ema(pdf):
pdf['ewm'] = pdf['count'].ewm(span=5, min_periods=1).mean()
return pdf
return df.groupby('Name').apply(ema)
df = spark.createDataFrame(
[("a", 1, 1), ("a", 2, 3), ("a", 3, 3), ("b", 1, 10), ("b", 8, 3), ("b", 9, 0)],
("name", "date", "count")
)
exp_ma(df).show()
# +----+----+-----+------------------+
# |Name|Date|count| ewma|
# +----+----+-----+------------------+
# | b| 1| 10| 10.0|
# | b| 8| 3| 5.800000000000001|
# | b| 9| 0|3.0526315789473686|
# | a| 1| 1| 1.0|
# | a| 2| 3| 2.2|
# | a| 3| 3| 2.578947368421052|
# +----+----+-----+------------------+
Я не использую много панд, так что может быть более элегантный способ сделать это.