У меня есть кадр данных, содержащий следующие 3 столбца: 1. ID 2. метка времени 3. IP_Address
Данные охватывают период с 2019- 07 -01 до 2019-09-20.Я пытаюсь объединить количество IP-адресов за последние 60 дней , разделенных по идентификатору для всех строк между 20-дневным периодом с 2019 по 1007 * 09 -01 по 2019-09-20.
Я попытался использовать следующую оконную функцию, и она прекрасно работает:
days = lambda i: i*86400
w = Window.partitionBy('id')\
.orderBy(unix_timestamp(col('timestamp')))\
.rangeBetween(start=-days(60), end=Window.currentRow)
df = df.withColumn("ip_counts", count(df.ip_address).over(w))
Однако проблема в том, что он вычисляет эти агрегации даже за период, который мне не нужен.расчет: с 2019-07-01 по 2019-08-31.Я мог бы легко отфильтровать результаты за выбранный период ретроспективно после вычислений, но я не хочу ненужных вычислений, поскольку я имею дело с ~ 3-10 миллионами строк в день.
Если я отфильтрую кадр данных следующим образом:
dates = ('2019-09-01', '2019-09-20')
date_from, date_to = [F.to_date(F.lit(s)).cast("timestamp") for s in dates]
w = Window.partitionBy('id')\
.orderBy(unix_timestamp(col('timestamp')))\
.rangeBetween(start=-days(60), end=Window.currentRow)
df = df.where((df.timestamp >= date_from) & (df.timestamp <= date_to))\
.withColumn("ip_counts", count(df.ip_address).over(w))
в этом случае идентификаторы между этими днями не могут получить доступ к данным для этих идентификаторов за предшествующие 60 дней, и поэтому значения неверны.
Что я могу сделать, чтобы вычисляет агрегации только для строк, попадающих между 2019-09-01 и 2019-09-20, и в то же время обеспечивает доступ к данным за предыдущие 60 дней для каждой из этих агрегаций.Большое спасибо за вашу помощь.