Конвертируйте функцию Pandas, наиболее подходящую в pyspark - PullRequest
0 голосов
/ 18 марта 2019

Я использовал эту функцию для создания функции временных рядов в Pandas, которая возвращает (OLS?) Наиболее подходящий наклон заданного диапазона точек:

def best_fit(X, Y):
    xbar = sum(X)/len(X)
    ybar = sum(Y)/len(Y)
    n = len(X) 
    numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
    denum = sum([xi**2 for xi in X]) - n * xbar**2
    b = numer / denum
    return b

Вот простой пример, показывающий результаты (см. Окончательный вариант ниже):

import pandas as pd
import numpy as np
import random
cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
  df.loc[i,'x_vals'] = i
  df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points

Затем я применяю функцию best_fit, чтобы получить наклон предыдущих 5 точек:

for i,row in df.iterrows():
  if i>=5:
    X = df['x_vals'][i-5:i]
    Y = df['y_vals'][i-5:i]
    df.loc[i,'slope'] = best_fit(X, Y)
df

Что дает мне это:

x_vals  y_vals  slope
0   -0.648205   NaN
1   0.282729    NaN
2   0.785474    NaN
3   1.48546     NaN
4   0.408165    NaN
5   1.61244     0.331548
6   2.60868     0.228211
7   3.77621     0.377338
8   4.08937     0.678201
9   4.34625     0.952618
10  5.47554     0.694832
11  7.90902     0.630377
12  8.83912     0.965180
13  9.01195     1.306227
14  11.8244     1.269497
15  13.3199     1.380057
16  15.2751     1.380692
17  15.3959     1.717981
18  18.454      1.621861
19  20.0773     1.533528

Мне нужно получить тот же столбец наклона из фрейма данных pyspark вместо Pandas, только я изо всех сил пытаюсь найти отправную точку для этого (окно pyspark ?, Встроенная функция OLS ?, udf?).

Ответы [ 2 ]

1 голос
/ 19 марта 2019

Используйте окно Pyspark, соберите предыдущие значения 5 col в виде списка и вызовите best_fit_udf

#moodified this function to handle 0 division and size of elements
def best_fit(X, Y):
    xbar = sum(X)/len(X)
    ybar = sum(Y)/len(Y)
    n = len(X)
    if n < 6 :
       return None
    numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
    denum = sum([xi**2 for xi in X]) - n * xbar**2
    if denum == 0:
       return None
    else:
       return numer / denum

best_fit_udf = udf(best_fit, DoubleType())

cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
  df.loc[i,'x_vals'] = i
  df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points

spark_df = spark.createDataFrame(df)

w = Window.orderBy("x_vals").rowsBetween(-5, 0)

df = spark_df.select("x_vals","y_vals",(F.collect_list('x_vals')).over(w).alias("x_list"), (F.collect_list('y_vals')).over(w).alias("y_list"))

df.withColumn("slope", best_fit_udf('x_list','y_list') ).drop('x_list','y_list').show()

, что дает мне

+------+--------------------+------------------+
|x_vals|              y_vals|             slope|
+------+--------------------+------------------+
|     0|-0.05626232194330516|              null|
|     1|  1.0626613654187942|              null|
|     2|-0.18870622421238525|              null|
|     3|  1.7106172105001147|              null|
|     4|  1.9398571272258158|              null|
|     5|  2.3632022124308474| 0.475092382628695|
|     6|  1.7264493731921893|0.3201115790149247|
|     7|   3.298712278452215|0.5116552596172641|
|     8|  4.3179382280764305|0.4707547914949186|
|     9|    4.00691449276564|0.5077645079970263|
|    10|   6.085792506183289|0.7563877936316236|
|    11|   7.272669055040746|1.0223232959178614|
|    12|    8.70598472345308| 1.085126649123283|
|    13|  10.141576882812515|1.2686365861314373|
|    14|  11.170519757896672| 1.411962717827295|
|    15|  11.999868557507794|1.2199864149871311|
|    16|   14.86294824152797|1.3960568659909833|
|    17|  16.698964370210007| 1.570238888844051|
|    18|   18.71951724368806|1.7810890092953742|
|    19|  20.428078271618062|1.9509358501665701|
+------+--------------------+------------------+
0 голосов
/ 20 марта 2019

Спасибо @Ranga Vure.Я проверил вашу функцию на соответствие исходной функции best_fit (с вашими значениями, конечно) и получаю разные значения для наклона.Вот что дает функция best_fit (значения y_vals выглядят округленными, но это не так):

x_vals  y_vals      slope_py
0       -0.0562623  NaN
1       1.06266     NaN
2       -0.188706   NaN
3       1.71062     NaN
4       1.93986     NaN
5       2.3632      0.464019
6       1.72645     0.472965
7       3.29871     0.448290
8       4.31794     0.296278
9       4.00691     0.569167
10      6.08579     0.587891
11      7.27267     0.942689
12      8.70598     0.971577
13      10.1416     1.204185
14      11.1705     1.488952
15      11.9999     1.303836
16      14.8629     1.191893
17      16.699      1.417222
18      18.7195     1.680720
19      20.4281     1.979709

Я перевел функцию best_fit в sqlContext, и это дало мне те же значенияв качестве исходной функции best_fit:

spark_df.createOrReplaceTempView('tempsql')
df_sql = sqlContext.sql("""
SELECT *,
(((sum(x_vals*y_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(y_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5) /
((sum(x_vals*x_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5)) as slope_sql
from tempsql 
""")

Это дает те же значения, что и исходная функция best_fit, за исключением точек 3-5, когда она рассчитывает наклон до предполагаемого начала (т. е. 6-я точка) - небольшая причуда, но я могу жить с ней:

+------+-------------------+-------------------+--------------------+
|x_vals|             y_vals|           slope_py|           slope_sql|
+------+-------------------+-------------------+--------------------+
|     0|-0.0562623219433051|                NaN|                null|
|     1|   1.06266136541879|                NaN|                null|
|     2| -0.188706224212385|                NaN|  1.0767269459046163|
|     3|   1.71061721050011|                NaN|0.060822882948800006|
|     4|   1.93985712722581|                NaN|  0.4092836048203674|
|     5|   2.36320221243084| 0.4640194743419549|  0.4640194743419549|
|     6|   1.72644937319218| 0.4729645045462295|  0.4729645045462295|
|     7|   3.29871227845221|0.44828961967398656|  0.4482896196739862|
|     8|   4.31793822807643| 0.2962782381870575| 0.29627823818705823|
|     9|   4.00691449276564|  0.569167226772261|  0.5691672267722595|
...