Сортировать список в Pyspark, используя udf и numpy - PullRequest
0 голосов
/ 08 февраля 2020

У меня есть фрейм данных PySpark, где второй столбец представляет собой список списков.

Ниже приводится мой фрейм данных PySpark:

+---+------------------------------+
|A  |B                             |
+---+------------------------------+
|a  |[[95.0], [25.0, 25.0], [40.0]]|
|a  |[[95.0], [20.0, 80.0]]        |
|a  |[[95.0], [25.0, 75.0]]        |
|b  |[[95.0], [25.0, 75.0]]        |
|b  |[[95.0], [12.0, 88.0]]        |
+---+------------------------------+

В этом примере я пытаюсь сгладить массивы (во втором столбце), отсортировать массивы и удалить самый большой элемент в последующем массиве numpy .

Ниже приведен вывод, который я ожидаю:

+---+------------------------------+
|A  |B                             |
+---+------------------------------+
|a  |[25.0, 25.0, 40.0]            |
|a  |[20.0, 80.0]                  |
|a  |[25.0, 75.0]                  |
|b  |[25.0, 75.0]                  |
|b  |[12.0, 88.0]                  |
+---+------------------------------+

Ниже представлен udf, который у меня есть в настоящее время:

def remove_highest(col):
    return np.sort( np.asarray([item for sublist in col for item in sublist])  )[:-1]

udf_remove_highest = F.udf( remove_highest , T.ArrayType() )

При попытке получить следующую ошибку чтобы создать этот udf:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-20-6984c2f41293> in <module>()
      2     return np.sort( np.asarray([item for sublist in col for item in sublist])  )[:-1]
      3 
----> 4 udf_remove_highest = F.udf( remove_highest , T.ArrayType() )

TypeError: __init__() missing 1 required positional argument: 'elementType'

Я хотел бы предпочесть udf, который использует numpy массивы. Как я могу достичь вышеупомянутой цели?

1 Ответ

2 голосов
/ 08 февраля 2020

Чтобы ваш код работал, сделайте следующее:

Numpy тип массива не поддерживается как тип данных для искровых фреймов данных, поэтому правильно, когда вы возвращаете преобразованный массив, добавьте .tolist () к он будет отправлен как принятый список python. И добавьте floattype в ваш тип массива

def remove_highest(col):
    return (np.sort( np.asarray([item for sublist in col for item in sublist])  )[:-1]).tolist()

udf_remove_highest = F.udf( remove_highest , T.ArrayType(T.FloatType()) )

Самый эффективный способ сделать это без udfs. Использование функций более высокого порядка:

Это будет работать только для версии 2.4 и выше.

Создание образца данных для примера:

from pyspark.sql import functions as F
from pyspark.sql.types import *

list=[['a',[[95.0], [25.0, 25.0], [40.0]]],
      ['a',[[95.0], [20.0, 80.0]]],
      ['a',[[95.0], [25.0, 75.0]]],
      ['b',[[95.0], [25.0, 75.0]]],
      ['b',[[95.0], [12.0, 88.0]]]]

cSchema = StructType([StructField("A", StringType())\
                      ,StructField("B", ArrayType(ArrayType(FloatType())))])
df= spark.createDataFrame(list,schema=cSchema)

Выражение фильтра с flatten и array_max:

expression="""filter(B, x -> x != C )"""
df1=df.withColumn("B",(F.sort_array(F.flatten("B")))).withColumn("C",F.array_max("B")).withColumn("B", F.expr(expression) )\
.drop("C")
df1.show()

Вывод:

+---+------------------+
|  A|                 B|
+---+------------------+
|  a|[25.0, 25.0, 40.0]|
|  a|      [20.0, 80.0]|
|  a|      [25.0, 75.0]|
|  b|      [25.0, 75.0]|
|  b|      [12.0, 88.0]|
+---+------------------+
...