PySpark применяет UDF для экспоненциального взвешенного среднего из массива collect_list - PullRequest
0 голосов
/ 27 мая 2018

В конечном итоге я надеюсь восстановить функциональность, аналогичную описанной в Pyspark SPARK-22239 , которая позволит использовать оконные функции с пользовательскими функциями Pandas.

В частности,Я выполняю основанное на временных метках оконное отображение базовых числовых наблюдений, а затем вычисляю средневзвешенное экспоненциальное значение для каждого окна.У меня есть рабочий подход, но я обеспокоен тем, что он может быть неэффективным и что я могу пропустить более оптимизированное решение.

Я подошел к этой проблеме, используя collect_list, чтобы получить массив числовыхзначения, соответствующие правильному временному окну для каждой строки в кадре данных, и затем применение UDF для вычисления экспоненциально-взвешенного среднего для каждого массива.

from pyspark.sql.functions import udf, collect_list
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window


# collect the relevant set of prices for the moving average per row
def mins(t_mins):
    """
    Utility function converting time in mins to time in secs.
    """
    return 60 * t_mins
w = Window.orderBy('date').rangeBetween(-mins(30), 0)
df = df.withColumn('windowed_price', collect_list('price').over(window))

# compute the exponential weighted mean from each array of prices
@udf(DoubleType())
def arr_to_ewm(arr):
    """
    Computes exponential weighted mean per row from array of relevant time points.
    """
    series = pd.Series(arr)
    ewm = series.ewm(alpha=0.5).mean().iloc[-1]
    # make sure return type is python primitive instead of Numpy dtype
    return float(ewm)
df = df.withColumn('price_ema_30mins', arr_to_ewm(df.windowed_price))

Приведенный выше подход работает, но я понимаю, что оба collect_list и udf вычислительно дороги.Есть ли более эффективный подход к выполнению этого вычисления в Pyspark?

...