Как запустить экспоненциально-взвешенную скользящую среднюю в pyspark - PullRequest
0 голосов
/ 30 апреля 2018

Я пытаюсь запустить экспоненциально взвешенное скользящее среднее в PySpark с использованием UDF Pandas Grouped Map. Это не работает, хотя:

def ExpMA(myData):

    from pyspark.sql.functions import pandas_udf
    from pyspark.sql.functions import PandasUDFType
    from pyspark.sql import SQLContext 

    df = myData
    group_col = 'Name'
    sort_col = 'Date'

    schema = df.select(group_col, sort_col,'count').schema
    print(schema)

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ema(pdf):
        Model = pd.DataFrame(pdf.apply(lambda x: x['count'].ewm(span=5, min_periods=1).mean()))
        return Model

    data = df.groupby('Name').apply(ema)

    return data

Я также попытался запустить его без Udf Pandas, просто написав уравнение ewma в PySpark, но проблема в том, что уравнение ewma содержит запаздывание текущего ewma.

1 Ответ

0 голосов
/ 03 мая 2018

Прежде всего, ваш код Панд неверен. Это просто не будет работать, Spark или нет

pdf.apply(lambda x: x['count'].ewm(span=5, min_periods=1).mean())

Другая проблема - это схема вывода, которая в зависимости от ваших данных не будет в действительности соответствовать результату:

  • Если хотите добавить ewm, схема должна быть расширена.
  • Если вы хотите вернуть только ewm, тогда схема слишком большая.
  • Если вы хотите просто заменить, он может не соответствовать типу.

Давайте предположим, что это первый сценарий (я позволил себе немного переписать ваш код):

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import DoubleType, StructField

def exp_ma(df, group_col='Name', sort_col='Date'):
    schema = (df.select(group_col, sort_col, 'count')
        .schema.add(StructField('ewma', DoubleType())))

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ema(pdf):
        pdf['ewm'] = pdf['count'].ewm(span=5, min_periods=1).mean()
        return pdf

    return df.groupby('Name').apply(ema)

df = spark.createDataFrame(
    [("a", 1, 1), ("a", 2, 3), ("a", 3, 3), ("b", 1, 10), ("b", 8, 3), ("b", 9, 0)], 
    ("name", "date", "count")
)

exp_ma(df).show()
# +----+----+-----+------------------+                                            
# |Name|Date|count|              ewma|
# +----+----+-----+------------------+
# |   b|   1|   10|              10.0|
# |   b|   8|    3| 5.800000000000001|
# |   b|   9|    0|3.0526315789473686|
# |   a|   1|    1|               1.0|
# |   a|   2|    3|               2.2|
# |   a|   3|    3| 2.578947368421052|
# +----+----+-----+------------------+

Я не использую много панд, так что может быть более элегантный способ сделать это.

...