функция задержки pyspark с несовместимым временным рядом - PullRequest
0 голосов
/ 20 ноября 2018
import pyspark.sql.functions as F
from pyspark.sql.window import Window

Я хотел бы использовать оконную функцию, чтобы найти значение из столбца 4 периода назад.

Предположим, мои данные (df) выглядят так (на самом деле у меня много разных идентификаторов):

ID | value | period

a  |  100  |   1   
a  |  200  |   2   
a  |  300  |   3   
a  |  400  |   5   
a  |  500  |   6   
a  |  600  |   7   

Если бы временной ряд был постоянным (например, период 1-6), я мог бы просто использовать F.lag(df['value'], count=4).over(Window.partitionBy('id').orderBy('period'))

Однако, поскольку временной ряд имеет разрыв, значения будут смещены.

Мой желаемый вывод будет таким:

ID | value | period | 4_lag_value
a  |  100  |   1    |     nan
a  |  200  |   2    |     nan 
a  |  300  |   3    |     nan
a  |  400  |   5    |     100
a  |  500  |   6    |     200
a  |  600  |   7    |     300

Как я могу сделать это в pyspark?

1 Ответ

0 голосов
/ 20 ноября 2018

Я придумала решение, но оно кажется излишне уродливым, приветствовало бы что-нибудь лучшее!

data = spark.sparkContext.parallelize([
        ('a',100,1),
        ('a',200,2),
        ('a',300,3),
        ('a',400,5),
        ('a',500,6),
        ('a',600,7)])

df = spark.createDataFrame(data, ['id','value','period'])

window = Window.partitionBy('id').orderBy('period')

# look 1, 2, 3 and 4 rows behind:
for diff in [1,2,3,4]:
    df = df.withColumn('{}_diff'.format(diff),
                       df['period'] - F.lag(df['period'], count=diff).over(window))

# if any of these are 4, that's the lag we need
# if not, there is no 4 period lagged return, so return None

#initialise col
df = df.withColumn('4_lag_value', F.lit(None))
# loop:
for diff in [1,2,3,4]:
    df = df.withColumn('4_lag_value',
                       F.when(df['{}_diff'.format(diff)] == 4,
                                 F.lag(df['value'], count=diff).over(window))
                              .otherwise(df['4_lag_value']))

# drop working cols
df = df.drop(*['{}_diff'.format(diff) for diff in [1,2,3,4]])

Это возвращает желаемый результат.

...