Я пытаюсь использовать pandas_udf
.
У меня есть Spark DataFrame, в котором у меня есть столбец структуры Array:
root
|-- values: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- x: double (nullable = true)
| | |-- y: double (nullable = true)
Я хотел бы запустить pandas_udf над столбцом values
и для каждогозапись (что означает для каждого массива), возвращающая значение, основанное на определенной логике.
Когда в моем источнике данных у меня есть более одной записи со столбцом values
, содержащим пустой массив, я получаю эту ошибку:
line 89, in verify_result_length "expected %d, got %d" % (len(a[0]), len(result)))
RuntimeError: Result vector from pandas_udf was not the required length: expected 2, got 1
Поиск в Google Я нашел здесь исходный код: https://github.com/apache/spark/blob/master/python/pyspark/worker.py
С другой стороны, если у меня только одна запись с пустым массивом, процесс завершается без каких-либо проблем.Фильтрация в Spark записей с пустыми массивами не помогает (FYI .filter("size(values) != 0")
), это приводит к тому же поведению.
Но я не могу понять, почему я получаю эту ошибку и что я делаюнеправильно.Кто-нибудь может помочь?
РЕДАКТИРОВАТЬ
КОД :
import numpy as np
import pandas as pd
import pyspark.sql.functions as f
udf = f.pandas_udf(udf_function, returnType="float")
df.withColumn("newColumn",
f.when(f.size(f.col("values")) == 0, 0)\
.otherwise(udf(f.col("values.x"), f.col("values.y"))))
def udf_function(x_array, y_array, Window=20):
xy = []
data = pd.DataFrame({'x': x_array.tolist()[0], 'y': y_array.tolist()[0]})
x = data['x'].rolling(int(Window / 2), center=True, min_periods=1).sum()
y = data['y'].rolling(int(Window / 2), center=True, min_periods=1).sum()
for index in range(len(x)):
xy.append(np.sqrt(x[index] ** 2 + y[index] ** 2))
if not xy:
result = 0
else:
result = max(xy)
return pd.Series(result)
RECORDS :
{
"values": []
}
{
"values": []
}
{
"values": [
{
"x": -0.638,
"y": 0.879,
},
{
"x": -0.616,
"y": 0.809,
},
{
"x": -0.936,
"y": 0.762,
}]
}
PS Даже если с условием f.when(f.size(f.col("values")) == 0, 0).otherwise(udf() )
предполагается отфильтровать записи с пустыми массивами (или, по крайней мере, в случае пустого массива функция udf не должна вызываться), это выглядит такв любом случае функция udf обрабатывает эти записи!D: Что очень странно для меня
ДАЛЬНЕЙШИЙ КОНТЕКСТ :
Исключение возникло, когда я использовал около 15 или более входных файлов, нет никакой связи с использованиемзаписей с пустыми массивами.В любом случае, это странное исключение, потому что при наблюдаемом поведении я ожидал что-то вроде «JavaOutOfMemory Exception», но сообщаемое исключение не очень помогает мне понять настоящую проблему.
Тем временем я вернулся, используя более стабильный RDD, а не pandas_udf.