Pyspark накопительный продукт с использованием numpy - PullRequest
1 голос
/ 07 апреля 2020

Я хочу выполнить накопительный продукт, предыдущие успешные ответы используют логарифмические суммы c для дела. Тем не менее, есть ли способ использовать Numpy Cumsum. Я пытался без четкого результата, вот мой код:

import numpy   as np

def cumulative_product (x):
    """Calculation of cumulative product using numpy function cumprod.
    """
    return np.cumprod(float(x)).tolist()

spark_cumulative_product = udf(cumulative_product, ArrayType(DoubleType()))

# the dataset in question:
param.show()

Что дает мне, например:

+--------------+-----+
|financial_year|  wpi|
+--------------+-----+
|          2014|1.026|
|          2015|1.024|
|          2016|1.021|
|          2017|1.019|
|          2018|1.021|
+--------------+-----+

При применении

param = param.withColumn('cum_wpi', spark_cumulative_product(param_treasury['wpi']))
param.show()

У меня есть что нет никаких изменений, т.е.

+--------------+-----+-------+
|financial_year|  wpi|cum_wpi|
+--------------+-----+-------+
|          2014|1.026|[1.026]|
|          2015|1.024|[1.024]|
|          2016|1.021|[1.021]|
|          2017|1.019|[1.019]|
|          2018|1.021|[1.021]|
+--------------+-----+-------+

Может кто-нибудь помочь с тем, что идет не так, или если есть лучший способ сделать cumprod без использования exp-sum-log -Update: желаемый результат:

+--------------+-----+-------+
|financial_year|  wpi|cum_wpi|
+--------------+-----+-------+
|          2014|1.026| 1.026 |
|          2015|1.024| 1.051 |
|          2016|1.021| 1.073 |
|          2017|1.019| 1.093 |
|          2018|1.021| 1.116 |
+--------------+-----+-------+

1 Ответ

1 голос
/ 07 апреля 2020

Один из способов добиться этого, используя функцию серии cum_prod() pandas, используя pandas grouped map UDF.

Sample DataFrame:

#+--------------+-----+
#|financial_year|  wpi|
#+--------------+-----+
#|          2014|1.026|
#|          2015|1.024|
#|          2016|1.021|
#|          2017|1.019|
#|          2018|1.021|
#+--------------+-----+

Сначала я создам фиктивный столбец , который будет похож на наш cum_wpi . Я перезапишу этот фиктивный столбец в pandas udf. Использование orderBy прямо перед групповым и применение к гарантируют , что кадр данных отсортирован по financial_year.

import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
df1=df.withColumn("cum_wpi", F.lit(1.2456))
@pandas_udf(df1.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
     df1['cum_wpi']=df1['wpi'].cumprod().round(decimals=3)

     return df1
df.orderBy(F.col("financial_year").asc())\
  .groupby().apply(grouped_map).show()

#+--------------+-----+-------+
#|financial_year|  wpi|cum_wpi|
#+--------------+-----+-------+
#|          2014|1.026|  1.026|
#|          2015|1.024|  1.051|
#|          2016|1.021|  1.073|
#|          2017|1.019|  1.093|
#|          2018|1.021|  1.116|
#+--------------+-----+-------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...