pyspark: получить последнее наблюдение в каждой подгруппе - PullRequest
0 голосов
/ 20 марта 2020

Я новичок ie в pyspark. Я хочу получить последнее наблюдение в конце каждой минуты для каждой акции. Мой высокочастотный фрейм данных выглядит следующим образом:

+-----+--------+-------+----------+----------+----------+
|stock| date   | hour  |  minute  |  second  |  price   |
+-----+--------+-------+----------+----------+----------+
 VOD  | 01-02  |  10   |   13     |   11     |  85.35   |
 VOD  | 01-02  |  10   |   13     |   12     |  85.75   |
 VOD  | 01-02  |  10   |   14     |   09     |  84.35   |    
 VOD  | 01-02  |  10   |   14     |   16     |  82.85   |   
 VOD  | 01-02  |  10   |   14     |   26     |  85.65   |   
 VOD  | 01-02  |  10   |   15     |   07     |  84.35   |    
 ...     ...      ...     ....       ...         ...
 ABC  | 01-02  |  11   |   13     |   11     |  25.35   |
 ABC  | 01-02  |  11   |   13     |   15     |  25.39   |
 ABC  | 01-02  |  11   |   13     |   19     |  25.26   |

Желаемый результат должен быть примерно таким:

+-----+--------+-------+-------+-------+
|stock| date   | hour  | minute| Price | 
+-----+--------+-------+-------+-------+
 VOD  | 01-02  |  10   |  13   | 85.75 |
 VOD  | 01-02  |  10   |  14   | 85.65 |
 VOD  | 01-02  |  10   |  15   | 84.35 |
 VOD  | 01-02  |  10   |  16   | 85.75 |
 ...     ...      ...    ....     ...       
 ABC  | 01-02  |  11   |  13   | 25.26 |   

Я знал, что, вероятно, мне придется использовать синтаксис partitionBy и orderBy, чтобы получить результаты, но я запутался с этими двумя. Я знаком с функцией groupby в SQL. Интересно, какая из них больше похожа на функцию groupby? Может кто-нибудь помочь? Спасибо. Оставайтесь в безопасности и держите себя в руках .

Ответы [ 2 ]

1 голос
/ 20 марта 2020

Мы можем использовать window функцию и раздел на 'stock', 'date', 'hour', 'minute' для создания нового кадра.

  • Для этого случая мы можем заказать на **second ** столбец и в descending порядке.

  • Тогда мы можем только выбрать first row из рамки окна.

Example:

df.show()
#+-----+-----+----+------+------+-----+
#|stock| date|hour|minute|second|price|
#+-----+-----+----+------+------+-----+
#|  VOD|01-02|  10|    13|    11|85.35|
#|  VOD|01-02|  10|    13|    12|85.75|
#|  VOD|01-02|  10|    14|    09|84.35|
#|  VOD|01-02|  10|    14|    16|82.85|
#|  VOD|01-02|  10|    14|    26|85.65|
#+-----+-----+----+------+------+-----+

from pyspark.sql.window import Window
from pyspark.sql.functions import *

w = Window.partitionBy('stock', 'date', 'hour', 'minute').orderBy(desc('second'))

#adding rownumber to the data
df.withColumn("rn",row_number().over(w)).show()

#+-----+-----+----+------+------+-----+---+
#|stock| date|hour|minute|second|price| rn|
#+-----+-----+----+------+------+-----+---+
#|  VOD|01-02|  10|    13|    12|85.75|  1|
#|  VOD|01-02|  10|    13|    11|85.35|  2|
#|  VOD|01-02|  10|    14|    26|85.65|  1|
#|  VOD|01-02|  10|    14|    16|82.85|  2|
#|  VOD|01-02|  10|    14|    09|84.35|  3|
#+-----+-----+----+------+------+-----+---+

#then select only the first row as we are ordering descending.
df.withColumn("rn",row_number().over(w)).filter(col("rn") == 1).drop("second","rn").show()
#+-----+-----+----+------+-----+
#|stock| date|hour|minute|price|
#+-----+-----+----+------+-----+
#|  VOD|01-02|  10|    13|85.75|
#|  VOD|01-02|  10|    14|85.65|
#+-----+-----+----+------+-----+
0 голосов
/ 20 марта 2020

После нескольких следов и ошибок. Кажется, я получил решение. Просто создайте столбец с совокупным значением цены, а затем выберите строку с наибольшей ценой.

w1(Window.partitionBy(df_trade['stock'],df_trade['date'],df_trade['hour'],df_trade['minute']).orderBy(df_trade['second']))

df1=df[['stock', 'date','hour','minute','second','price']].withColumn('subgroup',psf.sum('price').over(w1))
df1.orderBy(['stock', 'date','hour','minute','second']).show() 
 # create a column named subgroup to get the cumulative value of price

w=Window.partitionBy('stock', 'date','hour','minute','second')
df3=df1.withColumn('max',psf.max('subgroup').over(w)).where(psf.col('subgroup')==psf.col('max')).drop('max')        
#Get the row with largest value of cumulative price

df3=df3.orderBy(['stock', 'date','hour','minute','second'], ascending=[True, True,True,True,True]).drop('subgroup')

df3=df3.withColumnRenamed('price','lastprice')   # rename
df3.show()    
...