PySpark: раздвижные окна для выборочных строк - PullRequest
2 голосов
/ 26 сентября 2019

У меня есть кадр данных, содержащий следующие 3 столбца: 1. ID 2. метка времени 3. IP_Address

Данные охватывают период с 2019- 07 -01 до 2019-09-20.Я пытаюсь объединить количество IP-адресов за последние 60 дней , разделенных по идентификатору для всех строк между 20-дневным периодом с 2019 по 1007 * 09 -01 по 2019-09-20.

Я попытался использовать следующую оконную функцию, и она прекрасно работает:

days = lambda i: i*86400
w =  Window.partitionBy('id')\
           .orderBy(unix_timestamp(col('timestamp')))\
           .rangeBetween(start=-days(60), end=Window.currentRow)

df = df.withColumn("ip_counts", count(df.ip_address).over(w))

Однако проблема в том, что он вычисляет эти агрегации даже за период, который мне не нужен.расчет: с 2019-07-01 по 2019-08-31.Я мог бы легко отфильтровать результаты за выбранный период ретроспективно после вычислений, но я не хочу ненужных вычислений, поскольку я имею дело с ~ 3-10 миллионами строк в день.

Если я отфильтрую кадр данных следующим образом:

dates = ('2019-09-01', '2019-09-20')
date_from, date_to = [F.to_date(F.lit(s)).cast("timestamp") for s in dates]

w =  Window.partitionBy('id')\
           .orderBy(unix_timestamp(col('timestamp')))\
           .rangeBetween(start=-days(60), end=Window.currentRow)

df = df.where((df.timestamp >= date_from) & (df.timestamp <= date_to))\
       .withColumn("ip_counts", count(df.ip_address).over(w))

в этом случае идентификаторы между этими днями не могут получить доступ к данным для этих идентификаторов за предшествующие 60 дней, и поэтому значения неверны.

Что я могу сделать, чтобы вычисляет агрегации только для строк, попадающих между 2019-09-01 и 2019-09-20, и в то же время обеспечивает доступ к данным за предыдущие 60 дней для каждой из этих агрегаций.Большое спасибо за вашу помощь.

1 Ответ

0 голосов
/ 27 сентября 2019

Сначала я создал бы новый фрейм данных, сохраняющий все данные за последние 60 дней, а затем следовал бы вашему первому методу, вычисляя агрегирование только для строк, попадающих между 2019-09-01 и 2019-09-20.

...