Проблема
У меня есть Spark DataFrame со столбцом, который содержит значения не для каждой строки, а только для некоторых строк (на некоторой регулярной основе, например, только каждые 5-10 строк на основе идентификатора).
Теперь я хотел бы применить оконную функцию к строкам, содержащим значения, включающие две предыдущие и две следующие строки , которые также содержат значения (таким образом, делая вид, что все строки, содержащие нулине существует = не учитывается в направлении rowsBetween
окна).На практике мой эффективный размер окна может быть произвольным в зависимости от того, сколько строк содержит нули.Однако мне всегда нужно ровно два значения до и после.Кроме того, конечный результат должен содержать все строки из-за других столбцов, которые содержат важную информацию.
Пример
Например, я хочу вычислить сумму за предыдущие два, текущий и следующийдва (не нулевых) значения для строк в следующем фрейме данных, которые не равны нулю:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import Row
df = spark.createDataFrame([Row(id=i, val=i * 2 if i % 5 == 0 else None, foo='other') for i in range(100)])
df.show()
Вывод:
+-----+---+----+
| foo| id| val|
+-----+---+----+
|other| 0| 0|
|other| 1|null|
|other| 2|null|
|other| 3|null|
|other| 4|null|
|other| 5| 10|
|other| 6|null|
|other| 7|null|
|other| 8|null|
|other| 9|null|
|other| 10| 20|
|other| 11|null|
|other| 12|null|
|other| 13|null|
|other| 14|null|
|other| 15| 30|
|other| 16|null|
|other| 17|null|
|other| 18|null|
|other| 19|null|
+-----+---+----+
Если я просто использую функцию Window над фреймом данных, какЯ не могу указать условие, что значения не должны быть нулевыми, поэтому окно содержит только нулевые значения, что делает сумму равной значению строки:
df2 = df.withColumn('around_sum', F.when(F.col('val').isNotNull(), F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id')))).otherwise(None))
df2.show()
Результат:
+-----+---+----+----------+
| foo| id| val|around_sum|
+-----+---+----+----------+
|other| 0| 0| 0|
|other| 1|null| null|
|other| 2|null| null|
|other| 3|null| null|
|other| 4|null| null|
|other| 5| 10| 10|
|other| 6|null| null|
|other| 7|null| null|
|other| 8|null| null|
|other| 9|null| null|
|other| 10| 20| 20|
|other| 11|null| null|
|other| 12|null| null|
|other| 13|null| null|
|other| 14|null| null|
|other| 15| 30| 30|
|other| 16|null| null|
|other| 17|null| null|
|other| 18|null| null|
|other| 19|null| null|
+-----+---+----+----------+
Мне удалось достичь желаемого результата, создав второй фрейм данных, содержащий только строки, значения которых не равны NULL, выполнив там оконную операцию, а затем снова присоединив результат:
df3 = df.where(F.col('val').isNotNull())\
.withColumn('around_sum', F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))\
.select(F.col('around_sum'), F.col('id').alias('id2'))
df3 = df.join(df3, F.col('id') == F.col('id2'), 'outer').orderBy(F.col('id')).drop('id2')
df3.show()
Результат:
+-----+---+----+----------+
| foo| id| val|around_sum|
+-----+---+----+----------+
|other| 0| 0| 30|
|other| 1|null| null|
|other| 2|null| null|
|other| 3|null| null|
|other| 4|null| null|
|other| 5| 10| 60|
|other| 6|null| null|
|other| 7|null| null|
|other| 8|null| null|
|other| 9|null| null|
|other| 10| 20| 100|
|other| 11|null| null|
|other| 12|null| null|
|other| 13|null| null|
|other| 14|null| null|
|other| 15| 30| 150|
|other| 16|null| null|
|other| 17|null| null|
|other| 18|null| null|
|other| 19|null| null|
+-----+---+----+----------+
Вопрос
Теперь мне интересно, смогу ли я как-нибудь избавиться от объединения (и второго DataFrame) и вместо этого указать условие в потоке Windowction напрямую.
Возможно ли это?