Можно эмулировать функциональность "обратная засыпка" (и "ffill") из панд в Pyspark, используя функции windows.Это работает хорошо, но медленно.Я подумал, что лучшим подходом может быть просто объединить функциональность обратной засыпки панд в pandas_udf , что должно дать ускорение.На первый взгляд кажется, что это работает нормально, но потом я понял, что он очень неожиданно взаимодействует с фильтрацией.
Допустим, у нас есть логический столбец A и числовой столбец B, который равен нулю, если A истинен,Сначала мы запускаем обратную засыпку на B, а затем фильтруем на A. В этом случае все значения в результирующем столбце B все равно равны нулю, если мы используем функцию обратной засыпки pandas_udf!Как ни странно, проблему можно «решить», выполнив cache()
после обратной засыпки - что противоречит моему пониманию, что cache()
не меняет результат.
Вот пример:
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import *
from pyspark.sql.functions import *
import random
from random import randint
# Produce example data
T = range(10**6)
A = [random.uniform(0, 1) < 0.5 for _ in T]
B = [None if a else randint(0, 9) for a in A]
df = sc.parallelize([Row(T=t, A=a, B=b) for (t, a, b) in zip(T, A, B)])
df = df.toDF().cache()
Оба способа выполнения обратной засыпки работают должным образом, но использование pandas_udf в 3 раза быстрее:
w = Window.orderBy('T').rowsBetween(Window.currentRow, Window.unboundedFollowing)
@pandas_udf('long')
def backfill_udf(col):
return col.fillna(method='backfill')
%timeit -n1 -r1 df.withColumn('B', F.first(df.B, ignorenulls=True).over(w)).show()
%timeit -n1 -r1 df.orderBy('t').withColumn('B', backfill_udf(df.B)).show()
Теперь фильтр на A
после выполнения обратной засыпки:
# Still works
df.withColumn('B', F.first(df.B, ignorenulls=True).over(w)).filter(df.A).show()
# This doesn't work -- all B rows are null
df.orderBy('T').withColumn('B', backfill_udf(df.B)).filter(df.A).show()
# Works again
df.orderBy('T').withColumn('B', backfill_udf(df.B)).cache().filter(df.A).show()
Что именно здесь происходит?Что мне нужно понять, чтобы я больше не удивлялся такому поведению?Как может cache()
изменить результат?