У меня есть UDF, который пользователи Counter используют для поиска n наиболее часто встречающихся элементов в списке;список заполняется с помощью оконной функции через определенное количество строк, предшествующих текущей строке:
@udf(returnType=ArrayType(IntegerType()))
def most_common(x):
"""Find 2 most common elements"""
return [y[0] for y in Counter(x).most_common(n=2)]
# Example dataset:
df = sqlContext.createDataFrame(
[(1, 2, 9), (2, 3, 1), (3, 4, 1), (4, 5, 2),\
(1, 5, 6), (2, 3, 2), (5, 89, 12), (2, 6, 85),\
(1, 5, 6), (2, 12, 2), (5, 9, 12), (2, 65, 85),\
(1, 2, 9), (2, 3, 1), (3, 4, 1), (4, 5, 2),\
(1, 3, 53), (2, 13, 1), (3, 40, 1), (3, 5, 1),],\
("id", "timestamp", "value"))
from pyspark.sql.window import Window
# example window specification:
w = Window.partitionBy("id")\
.orderBy(F.col("timestamp"))\
.rangeBetween(start=-10, end=Window.currentRow)
df_agg = df.select("id", "timestamp", most_common(F.collect_list(F.col("timestamp")).over(w)).alias("top2_hours"),)
Этот UDF отлично работает на локальном экземпляре Spark на Spyder. Смотрите вывод ниже:
\+---+---------+--------------------+----------+
\| id|timestamp| all_hours|top2_hours|
\+---+---------+--------------------+----------+
\| 5| 9| [9]| [9]|
\| 5| 89| [89]| [89]|
\| 1| 2| [2, 2]| [2]|
\| 1| 2| [2, 2]| [2]|
\| 1| 3| [2, 2, 3]| [2, 3]|
\| 1| 5| [2, 2, 3, 5, 5]| [2, 5]|
\| 1| 5| [2, 2, 3, 5, 5]| [2, 5]|
\| 3| 4| [4, 4]| [4]|
\| 3| 4| [4, 4]| [4]|
\| 3| 5| [4, 4, 5]| [4, 5]|
\| 3| 40| [40]| [40]|
\| 2| 3| [3, 3, 3]| [3]|
\| 2| 3| [3, 3, 3]| [3]|
\| 2| 3| [3, 3, 3]| [3]|
\| 2| 6| [3, 3, 3, 6]| [3, 6]|
\| 2| 12| [3, 3, 3, 6, 12]| [3, 6]|
\| 2| 13|[3, 3, 3, 6, 12, 13]| [3, 6]|
\| 2| 65| [65]| [65]|
\| 4| 5| [5, 5]| [5]|
\| 4| 5| [5, 5]| [5]|
\+---+---------+--------------------+----------+
Но я не могу заставить его работать на корпоративном кластере с использованием интерфейса Dataiku. Он возвращает следующую ошибку:
Py4JJavaError: Произошла ошибка при вызове o931.showString. : org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 4 на этапе 20.0 не выполнена 4 раза, последний сбой: потерянное задание 4.3 на этапе 20.0 (TID 62874, enchbcclprcp117.srv.bmogc.net, исполнитель 1): java.io.IOException: Невозможно запустить программу "": error = 2, нет такого файла или каталога
Я бы не хотел использовать для этого udf, если это возможно. Есть ли способ найти n наиболее часто встречающихся элементов в списке (который заполняется через определенное временное окно, разделенное идентификатором, как в коде выше) изначально / без UDF?