Pyspark применяет линейную регрессию к условию проверки окна - PullRequest
0 голосов
/ 31 мая 2019

У меня есть фрейм данных, упорядоченный по id , base_date , base_date_2 со связанным значением .Я вычислил совокупную сумму этих дельт для одного и того же (id, base_date).

df.show()  

+---+-------------+-------------+----------+------------------+
|id |  base_date  | base_date_2 |    value | base_date_cumsum |              
+---+-------------+-------------+----------+------------------+
| 1 |  2017-07-03 |  2014-09-09 |   1.0122 |        0         |              
| 1 |  2017-07-03 |  2015-02-11 |   1.6013 |        155       |               
| 1 |  2017-07-03 |  2015-09-11 |   1.6298 |        367       |               
| 2 |  2018-10-22 |  2014-06-24 |   1.6554 |        0         |         
| 2 |  2018-10-22 |  2014-10-17 |   2.1543 |        115       |             
| 2 |  2018-10-22 |  2015-05-27 |   2.2294 |        337       |
| 2 |  2018-10-22 |  2017-03-21 |   1.5201 |        1001      |              
| 2 |  2018-10-22 |  2017-09-23 |   0.9193 |        1187      |            
| 3 |  2018-06-07 |  2014-06-30 |   1.0495 |        0         |      
| 3 |  2018-06-07 |  2015-08-06 |   1.3706 |        402       |           
| 3 |  2018-06-07 |  2015-12-22 |   1.3643 |        540       |          
| 3 |  2018-06-07 |  2016-05-30 |   1.9342 |        700       |          
| 3 |  2018-06-07 |  2017-02-27 |   0.3606 |        973       |         
+---+-------------+-------------+----------+------------------+

Для целей проектирования функций я хочу вычислить коэффициент наклона линейной регрессии для одного и того же (id, base_date), то есть для разных base_date_2 и для разных периодов времени (1 месяц, 3 месяца, 5 месяцев и 1 год).

Чтобы получить этот коэффициент, я нашел способ, используя corr и stddev :

slope = corr (base_date_cumsum, value) * stddev (значение) / stddev (base_date_cumsum)

Затем я применяю это с соответствующим окном:

my_window =  Window.partitionBy('t_chassis', 'base_date')

df = df.withColumn('slope_coef',
    (stddev(col('value')).over(my_window))*
    ((corr(col('base_date_cumsum'), col('value')).over(my_window))/
    (stddev(col('base_date_cumsum')).over(my_window))))

Дача:

+---+-------------+-------------+----------+------------------+-------------+
|id |  base_date  | base_date_2 |    value | base_date_cumsum | slope_coef  |          
+---+-------------+-------------+----------+------------------+-------------+
| 1 |  2017-07-03 |  2014-09-09 |   1.0122 |        0         |  0.00159095 |
| 1 |  2017-07-03 |  2015-02-11 |   1.6013 |        155       |  0.00159095 | 
| 1 |  2017-07-03 |  2015-09-11 |   1.6298 |        367       |  0.00159095 | 
| 2 |  2018-10-22 |  2014-06-24 |   1.6554 |        0         | -0.00075601 |
| 2 |  2018-10-22 |  2014-10-17 |   2.1543 |        115       | -0.00075601 |
| 2 |  2018-10-22 |  2015-05-27 |   2.2294 |        337       | -0.00075601 |
| 2 |  2018-10-22 |  2017-03-21 |   1.5201 |        1001      | -0.00075601 |
| 2 |  2018-10-22 |  2017-09-23 |   0.9193 |        1187      | -0.00075601 |
| 3 |  2018-06-07 |  2014-06-30 |   1.0495 |        0         | -0.00035787 |
| 3 |  2018-06-07 |  2015-08-06 |   1.3706 |        402       | -0.00035787 |
| 3 |  2018-06-07 |  2015-12-22 |   1.3643 |        540       | -0.00035787 |
| 3 |  2018-06-07 |  2016-05-30 |   1.9342 |        700       | -0.00035787 |
| 3 |  2018-06-07 |  2017-02-27 |   0.3606 |        973       | -0.00035787 |
+---+-------------+-------------+----------+------------------+-------------+

Теперь я хочу добавить 5 новых столбцов, которые вычисляют одно и то же, но в течение определенного периода времени для каждого: coeff_1m , coeff_3m , coeff_5m , coeff_12m и coeff_24m .
Я пытался это:

df = df.withColumn('coeff_1m',
   when((col('base_date_cumsum') <= 31),
   (stddev(col('value')).over(my_window))*
   ((corr(col('date_p1koa_delta_cumsum'), col('value')).over(my_window))/
   (stddev(col('date_p1koa_delta_cumsum')).over(my_window))))

df = df.withColumn('coeff_3m',
   when((col('base_date_cumsum') <= 92),
   (stddev(col('value')).over(my_window))*
   ((corr(col('date_p1koa_delta_cumsum'), col('value')).over(my_window))/
   (stddev(col('date_p1koa_delta_cumsum')).over(my_window))))

etc..

Это возвращаетодинаковый коэффициент для всех 5 столбцов.Он действительно повторяет это несколько раз, когда действительно base_date_cumsum пересекает пороговые значения, но коэффициент не изменяется, что определенно должно быть с момента появления.вычисляется по большему количеству значений.

...