Как получить минимум вложенных списков в PySpark - PullRequest
0 голосов
/ 09 июня 2019

См. Следующий кадр данных, например,

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame([[[1, 2, 3, 4]],[[0, 2, 4]],[[]],[[3]]])
df.show()

Тогда у нас есть

+------------+
|          _1|
+------------+
|[1, 2, 3, 4]|
|   [0, 2, 4]|
|          []|
|         [3]|
+------------+

Тогда я хочу найти минимум каждого списка; используйте -1 в случае пустого списка. Я попробовал следующее, которое не работает.

import pyspark.sql.functions as F
sim_col = F.col('_1')
df.withColumn('min_turn_sim', F.when(F.size(sim_col)==0, -1.0).otherwise(F.min(sim_col))).show()

Ошибка:

AnalysisException: «не удается разрешить» СЛУЧАЙ, КОГДА (_1 IS NULL) THEN -1.0D ELSE min (_1) END 'из-за несоответствия типов данных: выражения THEN и ELSE должны быть одного типа или принудительно передаваться общий тип ;; \ n'Aggregate [_1 # 404, СЛУЧАЙ КОГДА isnull (_1 # 404) ПОСЛЕ -1.0 ПОЛНОСТЬЮ min (_1 # 404) END AS min_turn_sim # 411] \ n + - LogicalRDD [_1 # 404], false \ n «


Функция размера будет работать. Не понимаю, почему «мин» не делает.

df.withColumn('min_turn_sim', F.when(F.size(sim_col)==0, -1.0).otherwise(F.size(sim_col))).show()

+------------+------------+
|          _1|min_turn_sim|
+------------+------------+
|[1, 2, 3, 4]|         4.0|
|   [0, 2, 4]|         3.0|
|          []|        -1.0|
|         [3]|         1.0|
+------------+------------+

1 Ответ

2 голосов
/ 09 июня 2019

min - агрегатная функция - она ​​работает со столбцами, а не со значениями. Поэтому min(sim_col) означает минимальное значение массива во всех строках в области видимости в соответствии с порядком расположения массивов, а не минимальное значение в каждой строке.

Чтобы найти минимум для каждой строки, вам понадобится неагрегированная функция. В последних версиях Spark (2.4.0 и новее) это будет array_min (аналогично array_max для получения максимального значения):

df.withColumn("min_turn_sim", F.coalesce(F.array_min(sim_col), F.lit(-1)))

Более ранние версии требуют UDF:

@F.udf("long")
def long_array_min(xs):
    return min(xs) if xs else -1

df.withColumn("min_turn_sim", F.coalesce(long_array_min(sim_col), F.lit(-1))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...