Применение функции искрового окна к подмножеству данных - PullRequest
0 голосов
/ 25 сентября 2018

Я пытаюсь заставить spark применять оконную функцию только к указанному подмножеству кадра данных, в то время как фактическое окно имеет доступ к строкам за пределами этого подмножества.Позвольте мне перейти к примеру:

У меня есть искровой фрейм данных, который был сохранен в формате hdf.Фрейм данных содержит события, поэтому у каждой строки есть метка времени, идентификатор и целочисленная функция.Также есть столбец, который я вычисляю, который представляет собой функцию окна суммы, сделанную так:

df = spark.table("some_table_in_hdfs")

w = Window.partitionBy("id").orderBy("date")
df = df.withColumn("feat_int_sum", F.sum("feat_int").over(w))

df.show()
+----------+--------+---+------------+
|      date|feat_int| id|feat_int_sum|
+----------+--------+---+------------+
|2018-08-10|       5|  0|           5|
|2018-08-12|      27|  0|          32|
|2018-08-14|       3|  0|          35|
|2018-08-11|      32|  1|          32|
|2018-08-12|     552|  1|         584|
|2018-08-16|       2|  1|         586|
+----------+--------+---+------------+

Когда я загружаю новые данные из другого источника, я хотел бы добавить к приведенному выше фрейму данныхв формате hdf.Чтобы сделать это, я должен применить оконную функцию и к новым данным.Я хотел бы объединить два фрейма данных, чтобы оконная функция имела доступ к «старым» feat_int значениям для суммирования.

df_new = spark.table("some_new_table")
df_new.show()
+----------+--------+---+
|      date|feat_int| id|
+----------+--------+---+
|2018-08-20|      65|  0|
|2018-08-23|       3|  0|
|2018-08-24|       4|  0|
|2018-08-21|      69|  1|
|2018-08-25|      37|  1|
|2018-08-26|       3|  1|
+----------+--------+---+

df_union = df.union(df_new.withColumn("feat_int_sum", F.lit(None)))
df_union.show()
+----------+--------+---+------------+
|      date|feat_int| id|feat_int_sum|
+----------+--------+---+------------+
|2018-08-10|       5|  0|           5|
|2018-08-12|      27|  0|          32|
|2018-08-14|       3|  0|          35|
|2018-08-20|      65|  0|        null|
|2018-08-23|       3|  0|        null|
|2018-08-24|       4|  0|        null|
|2018-08-11|      32|  1|          32|
|2018-08-12|     552|  1|         584|
|2018-08-16|       2|  1|         586|
|2018-08-21|      69|  1|        null|
|2018-08-25|      37|  1|        null|
|2018-08-26|       3|  1|        null|
+----------+--------+---+------------+

Проблема в том, что я хотел бы применить оконную функцию суммы к df_union, но только к строкам с null в feat_int_sum.Причина в том, что я не хочу пересчитывать оконную функцию для всех значений, которые уже вычислены в df.Таким образом, желаемый результат будет примерно таким:

+----------+--------+---+------------+-----------------+
|      date|feat_int| id|feat_int_sum|feat_int_sum_temp|
+----------+--------+---+------------+-----------------+
|2018-08-10|       5|  0|           5|             null|
|2018-08-12|      27|  0|          32|             null|
|2018-08-14|       3|  0|          35|             null|
|2018-08-20|      65|  0|        null|              100|
|2018-08-23|       3|  0|        null|              103|
|2018-08-24|       4|  0|        null|              107|
|2018-08-11|      32|  1|          32|             null|
|2018-08-12|     552|  1|         584|             null|
|2018-08-16|       2|  1|         586|             null|
|2018-08-21|      69|  1|        null|              655|
|2018-08-25|      37|  1|        null|              692|
|2018-08-26|       3|  1|        null|              695|
+----------+--------+---+------------+-----------------+

Я попытался обернуть оконную функцию в оператор when следующим образом:

df_union.withColumn("feat_int_sum_temp", F.when(F.col('date') > '2018-08-16', F.sum('feat_int').over(w))

Но, глядя на план объяснения искрыПохоже, что он будет запускать оконную функцию для всех строк, а затем применять условие when.Причина, по которой я не хочу запускать оконную функцию для старых строк, заключается в том, что я имею дело с некоторыми действительно большими таблицами и не хочу тратить впустую вычислительные ресурсы, пересчитывая значения, которые не будут использоваться.После этого шага я объединил бы столбцы feat_int_sum и feat_int_sum_temp и добавил бы только новую часть данных в hdfs.Я был бы признателен за любые советы о том, как заставить spark применять только оконную функцию к указанному подмножеству, в то время как фактическое окно имеет доступ к строкам за пределами этого подмножества.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...