Чтобы ваш код работал, сделайте следующее:
Numpy тип массива не поддерживается как тип данных для искровых фреймов данных, поэтому правильно, когда вы возвращаете преобразованный массив, добавьте .tolist () к он будет отправлен как принятый список python. И добавьте floattype в ваш тип массива
def remove_highest(col):
return (np.sort( np.asarray([item for sublist in col for item in sublist]) )[:-1]).tolist()
udf_remove_highest = F.udf( remove_highest , T.ArrayType(T.FloatType()) )
Самый эффективный способ сделать это без udfs. Использование функций более высокого порядка:
Это будет работать только для версии 2.4 и выше.
Создание образца данных для примера:
from pyspark.sql import functions as F
from pyspark.sql.types import *
list=[['a',[[95.0], [25.0, 25.0], [40.0]]],
['a',[[95.0], [20.0, 80.0]]],
['a',[[95.0], [25.0, 75.0]]],
['b',[[95.0], [25.0, 75.0]]],
['b',[[95.0], [12.0, 88.0]]]]
cSchema = StructType([StructField("A", StringType())\
,StructField("B", ArrayType(ArrayType(FloatType())))])
df= spark.createDataFrame(list,schema=cSchema)
Выражение фильтра с flatten и array_max:
expression="""filter(B, x -> x != C )"""
df1=df.withColumn("B",(F.sort_array(F.flatten("B")))).withColumn("C",F.array_max("B")).withColumn("B", F.expr(expression) )\
.drop("C")
df1.show()
Вывод:
+---+------------------+
| A| B|
+---+------------------+
| a|[25.0, 25.0, 40.0]|
| a| [20.0, 80.0]|
| a| [25.0, 75.0]|
| b| [25.0, 75.0]|
| b| [12.0, 88.0]|
+---+------------------+