Я пытаюсь заставить spark применять оконную функцию только к указанному подмножеству кадра данных, в то время как фактическое окно имеет доступ к строкам за пределами этого подмножества.Позвольте мне перейти к примеру:
У меня есть искровой фрейм данных, который был сохранен в формате hdf.Фрейм данных содержит события, поэтому у каждой строки есть метка времени, идентификатор и целочисленная функция.Также есть столбец, который я вычисляю, который представляет собой функцию окна суммы, сделанную так:
df = spark.table("some_table_in_hdfs")
w = Window.partitionBy("id").orderBy("date")
df = df.withColumn("feat_int_sum", F.sum("feat_int").over(w))
df.show()
+----------+--------+---+------------+
| date|feat_int| id|feat_int_sum|
+----------+--------+---+------------+
|2018-08-10| 5| 0| 5|
|2018-08-12| 27| 0| 32|
|2018-08-14| 3| 0| 35|
|2018-08-11| 32| 1| 32|
|2018-08-12| 552| 1| 584|
|2018-08-16| 2| 1| 586|
+----------+--------+---+------------+
Когда я загружаю новые данные из другого источника, я хотел бы добавить к приведенному выше фрейму данныхв формате hdf.Чтобы сделать это, я должен применить оконную функцию и к новым данным.Я хотел бы объединить два фрейма данных, чтобы оконная функция имела доступ к «старым» feat_int
значениям для суммирования.
df_new = spark.table("some_new_table")
df_new.show()
+----------+--------+---+
| date|feat_int| id|
+----------+--------+---+
|2018-08-20| 65| 0|
|2018-08-23| 3| 0|
|2018-08-24| 4| 0|
|2018-08-21| 69| 1|
|2018-08-25| 37| 1|
|2018-08-26| 3| 1|
+----------+--------+---+
df_union = df.union(df_new.withColumn("feat_int_sum", F.lit(None)))
df_union.show()
+----------+--------+---+------------+
| date|feat_int| id|feat_int_sum|
+----------+--------+---+------------+
|2018-08-10| 5| 0| 5|
|2018-08-12| 27| 0| 32|
|2018-08-14| 3| 0| 35|
|2018-08-20| 65| 0| null|
|2018-08-23| 3| 0| null|
|2018-08-24| 4| 0| null|
|2018-08-11| 32| 1| 32|
|2018-08-12| 552| 1| 584|
|2018-08-16| 2| 1| 586|
|2018-08-21| 69| 1| null|
|2018-08-25| 37| 1| null|
|2018-08-26| 3| 1| null|
+----------+--------+---+------------+
Проблема в том, что я хотел бы применить оконную функцию суммы к df_union
, но только к строкам с null
в feat_int_sum
.Причина в том, что я не хочу пересчитывать оконную функцию для всех значений, которые уже вычислены в df
.Таким образом, желаемый результат будет примерно таким:
+----------+--------+---+------------+-----------------+
| date|feat_int| id|feat_int_sum|feat_int_sum_temp|
+----------+--------+---+------------+-----------------+
|2018-08-10| 5| 0| 5| null|
|2018-08-12| 27| 0| 32| null|
|2018-08-14| 3| 0| 35| null|
|2018-08-20| 65| 0| null| 100|
|2018-08-23| 3| 0| null| 103|
|2018-08-24| 4| 0| null| 107|
|2018-08-11| 32| 1| 32| null|
|2018-08-12| 552| 1| 584| null|
|2018-08-16| 2| 1| 586| null|
|2018-08-21| 69| 1| null| 655|
|2018-08-25| 37| 1| null| 692|
|2018-08-26| 3| 1| null| 695|
+----------+--------+---+------------+-----------------+
Я попытался обернуть оконную функцию в оператор when
следующим образом:
df_union.withColumn("feat_int_sum_temp", F.when(F.col('date') > '2018-08-16', F.sum('feat_int').over(w))
Но, глядя на план объяснения искрыПохоже, что он будет запускать оконную функцию для всех строк, а затем применять условие when.Причина, по которой я не хочу запускать оконную функцию для старых строк, заключается в том, что я имею дело с некоторыми действительно большими таблицами и не хочу тратить впустую вычислительные ресурсы, пересчитывая значения, которые не будут использоваться.После этого шага я объединил бы столбцы feat_int_sum
и feat_int_sum_temp
и добавил бы только новую часть данных в hdfs.Я был бы признателен за любые советы о том, как заставить spark применять только оконную функцию к указанному подмножеству, в то время как фактическое окно имеет доступ к строкам за пределами этого подмножества.