Pyspark SQL Pandas UDF: возвращение массива - PullRequest
0 голосов
/ 21 сентября 2018

Я пытаюсь сделать UDF для панд, который принимает два столбца с целочисленными значениями и на основе разницы между этими значениями, возвращает массив десятичных дробей, длина которых равна вышеупомянутой разнице.

Вот мойПопытка пока, я возился с различными способами, пытаясь заставить это работать, но вот общая идея

import pandas as pd

@pandas_udf(ArrayType(DecimalType()), PandasUDFType.SCALAR)
def zero_pad(x, y):
  buffer = []

  for i in range(0, (x - y)):
    buffer.append(0.0)

  return buffer #correction provided by Ali Yessili

Вот пример того, как я использую это

df = df.withColumn("zero_list", zero_pad(df.x, df.y))

Конечный результат df с новым столбцом zero_list, представляющим собой столбец ArrayType (DecimalType ()), который выглядит как [0.0, 0.0, 0.0, ...] длина которого (df.x - df.y)

Сообщение об ошибке является настолько общим, что его почти не стоит публиковать, просто «Задание прервано из-за сбоя этапа», и оно прослеживается только до той части моего кода, где я выполняю df.show(),

Py4JJavaError                             Traceback (most recent call last)
<command-103561> in <module>()
---> 33 df.orderBy("z").show(n=1000)

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    350         """
    351         if isinstance(truncate, bool) and truncate:
--> 352             print(self._jdf.showString(n, 20, vertical))
    353         else:
    354             print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

Я надеюсь, что кто-то может указать мне правильное направление, чтобы сделать udf для панд, который будет возвращать массив переменной длины, или просто сказать мне, почему мой код или подход неверен.

Я делаювсе это с использованием блоков данных с искрой 2.3.1.

1 Ответ

0 голосов
/ 22 сентября 2018

Я не понимаю, почему вы возвращаете значение Серии Панд из функции.Он возвращает несколько строк для каждого отдельного ввода.

>>> import pandas as pd
>>> def zero_pad(x, y):
...     buffer = []
...     for i in range(0, (x - y)):
...             buffer.append(0.0)
...     return pd.Series(buffer)
... 
>>> zero_pad(5,1)
0    0.0
1    0.0
2    0.0
3    0.0
dtype: float64

Таким образом, вы не можете добавить столбец с результатом, который имеет несколько строк.

И с другой стороны, вы не можете использовать udf напрямую в операторе withColumn.Пожалуйста, смотрите мой сценарий ниже, я думаю, что результат именно то, что вы ищете

>>> from pyspark.sql.functions import udf
>>> 
>>> data = sc.parallelize([
...     (2,1),
...     (8,1),
...     (5,2),
...     (6,4)])
>>> columns = ['x','y']
>>> df = spark.createDataFrame(data, columns)
>>> df.show()
+---+---+
|  x|  y|
+---+---+
|  2|  1|
|  8|  1|
|  5|  2|
|  6|  4|
+---+---+

>>> def zero_pad(x, y):
...     buffer = []
...     for i in range(0, (x - y)):
...             buffer.append(0.0)
...     return buffer
... 
>>> my_udf = udf(zero_pad)
>>> df = df.withColumn("zero_list", my_udf(df.x, df.y))
>>> df.show()
+---+---+--------------------+
|  x|  y|           zero_list|
+---+---+--------------------+
|  2|  1|               [0.0]|
|  8|  1|[0.0, 0.0, 0.0, 0...|
|  5|  2|     [0.0, 0.0, 0.0]|
|  6|  4|          [0.0, 0.0]|
+---+---+--------------------+
...