RuntimeError: Вектор результата из pandas_udf не был требуемой длины: ожидал 2, получил 1 - PullRequest
0 голосов
/ 26 октября 2018

Я пытаюсь использовать pandas_udf.

У меня есть Spark DataFrame, в котором у меня есть столбец структуры Array:

root
 |-- values: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)

Я хотел бы запустить pandas_udf над столбцом values и для каждогозапись (что означает для каждого массива), возвращающая значение, основанное на определенной логике.

Когда в моем источнике данных у меня есть более одной записи со столбцом values, содержащим пустой массив, я получаю эту ошибку:

line 89, in verify_result_length "expected %d, got %d" % (len(a[0]), len(result)))

RuntimeError: Result vector from pandas_udf was not the required length: expected 2, got 1

Поиск в Google Я нашел здесь исходный код: https://github.com/apache/spark/blob/master/python/pyspark/worker.py

С другой стороны, если у меня только одна запись с пустым массивом, процесс завершается без каких-либо проблем.Фильтрация в Spark записей с пустыми массивами не помогает (FYI .filter("size(values) != 0")), это приводит к тому же поведению.

Но я не могу понять, почему я получаю эту ошибку и что я делаюнеправильно.Кто-нибудь может помочь?

РЕДАКТИРОВАТЬ

КОД :

import numpy as np
import pandas as pd
import pyspark.sql.functions as f

udf = f.pandas_udf(udf_function, returnType="float")
df.withColumn("newColumn",
             f.when(f.size(f.col("values")) == 0, 0)\
             .otherwise(udf(f.col("values.x"), f.col("values.y"))))

def udf_function(x_array, y_array, Window=20):
   xy = []
   data = pd.DataFrame({'x': x_array.tolist()[0], 'y': y_array.tolist()[0]})

   x = data['x'].rolling(int(Window / 2), center=True, min_periods=1).sum()
   y = data['y'].rolling(int(Window / 2), center=True, min_periods=1).sum()

   for index in range(len(x)):
       xy.append(np.sqrt(x[index] ** 2 + y[index] ** 2))

   if not xy:
       result = 0
   else:
       result = max(xy)

   return pd.Series(result)

RECORDS :

{
  "values": []
}

{
  "values": []
}

{
  "values": [
    {
      "x": -0.638,
      "y": 0.879,
    },
    {
      "x": -0.616,
      "y": 0.809,
    },
    {
      "x": -0.936,
      "y": 0.762,
    }]
}

PS Даже если с условием f.when(f.size(f.col("values")) == 0, 0).otherwise(udf() ) предполагается отфильтровать записи с пустыми массивами (или, по крайней мере, в случае пустого массива функция udf не должна вызываться), это выглядит такв любом случае функция udf обрабатывает эти записи!D: Что очень странно для меня

ДАЛЬНЕЙШИЙ КОНТЕКСТ :

Исключение возникло, когда я использовал около 15 или более входных файлов, нет никакой связи с использованиемзаписей с пустыми массивами.В любом случае, это странное исключение, потому что при наблюдаемом поведении я ожидал что-то вроде «JavaOutOfMemory Exception», но сообщаемое исключение не очень помогает мне понять настоящую проблему.

Тем временем я вернулся, используя более стабильный RDD, а не pandas_udf.

...