Pyspark - использование двух временных индексов для оконной функции - PullRequest
0 голосов
/ 07 мая 2019

У меня есть датафрейм, где в каждой строке есть два столбца даты.Я хотел бы создать оконную функцию с диапазоном между, который считает количество строк в определенном диапазоне, где ОБА столбцы даты находятся в пределах диапазона.В приведенном ниже случае обе временные метки строки должны быть перед временной меткой текущей строки, чтобы быть включенными в счет.

Пример df, включая столбец count:

    +---+-----------+-----------+-----+
    | ID|Timestamp_1|Timestamp_2|Count|
    +---+-----------+-----------+-----+
    |  a|          0|          3|    0|
    |  b|          2|          5|    0|
    |  d|          5|          5|    3|
    |  c|          5|          9|    3|
    |  e|          8|         10|    4|
    +---+-----------+-----------+-----+

Я попытался создать два окна и создать новый столбец для обоих из них:

    w_1 = Window.partitionBy().orderBy('Timestamp_1').rangeBetween(Window.unboundedPreceding, 0)
    w_2 = Window.partitionBy().orderBy('Timestamp_2').rangeBetween(Window.unboundedPreceding, 0)

    df = df.withColumn('count', F.count('ID').over(w_1).over(w_2))

Однако этоне допускается в Pyspark и поэтому выдает ошибку.

Есть идеи?Решения в SQL тоже хороши!

1 Ответ

1 голос
/ 20 мая 2019

Будет ли работать самообъединение?

from pyspark.sql import functions as F

df_count = (
    df.alias('a')
    .join(
        df.alias('b'),
        (F.col('b.Timestamp_1') <= F.col('a.Timestamp_1')) &
        (F.col('b.Timestamp_2') <= F.col('a.Timestamp_2')),
        'left'
    )
    .groupBy(
        'a.ID'
    )
    .agg(
        F.count('b.ID').alias('count')
    )
)

df = df.join(df_count, 'ID')
...