Pyspark - упаковка функций pandas в pandas_udf неожиданным образом взаимодействует с фильтрацией и кэшированием - PullRequest
0 голосов
/ 21 марта 2019

Можно эмулировать функциональность "обратная засыпка" (и "ffill") из панд в Pyspark, используя функции windows.Это работает хорошо, но медленно.Я подумал, что лучшим подходом может быть просто объединить функциональность обратной засыпки панд в pandas_udf , что должно дать ускорение.На первый взгляд кажется, что это работает нормально, но потом я понял, что он очень неожиданно взаимодействует с фильтрацией.

Допустим, у нас есть логический столбец A и числовой столбец B, который равен нулю, если A истинен,Сначала мы запускаем обратную засыпку на B, а затем фильтруем на A. В этом случае все значения в результирующем столбце B все равно равны нулю, если мы используем функцию обратной засыпки pandas_udf!Как ни странно, проблему можно «решить», выполнив cache() после обратной засыпки - что противоречит моему пониманию, что cache() не меняет результат.

Вот пример:

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import *
from pyspark.sql.functions import *
import random
from random import randint

# Produce example data
T = range(10**6)
A = [random.uniform(0, 1) < 0.5 for _ in T]
B = [None if a else randint(0, 9) for a in A]
df = sc.parallelize([Row(T=t, A=a, B=b) for (t, a, b) in zip(T, A, B)])
df = df.toDF().cache()

Оба способа выполнения обратной засыпки работают должным образом, но использование pandas_udf в 3 раза быстрее:

w = Window.orderBy('T').rowsBetween(Window.currentRow, Window.unboundedFollowing)

@pandas_udf('long')
def backfill_udf(col):
  return col.fillna(method='backfill')

%timeit -n1 -r1 df.withColumn('B', F.first(df.B, ignorenulls=True).over(w)).show()

%timeit -n1 -r1 df.orderBy('t').withColumn('B', backfill_udf(df.B)).show()

Теперь фильтр на A после выполнения обратной засыпки:

# Still works
df.withColumn('B', F.first(df.B, ignorenulls=True).over(w)).filter(df.A).show()

# This doesn't work -- all B rows are null
df.orderBy('T').withColumn('B', backfill_udf(df.B)).filter(df.A).show()

# Works again
df.orderBy('T').withColumn('B', backfill_udf(df.B)).cache().filter(df.A).show()

Что именно здесь происходит?Что мне нужно понять, чтобы я больше не удивлялся такому поведению?Как может cache() изменить результат?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...