Pyspark: найти N наиболее часто встречающихся элементов в списке без UDF - PullRequest
0 голосов
/ 08 октября 2019

У меня есть UDF, который пользователи Counter используют для поиска n наиболее часто встречающихся элементов в списке;список заполняется с помощью оконной функции через определенное количество строк, предшествующих текущей строке:

@udf(returnType=ArrayType(IntegerType()))
def most_common(x):
    """Find 2 most common elements"""
    return [y[0] for y in Counter(x).most_common(n=2)]

# Example dataset:
df = sqlContext.createDataFrame(
   [(1, 2, 9), (2, 3, 1), (3, 4, 1), (4, 5, 2),\
    (1, 5, 6), (2, 3, 2), (5, 89, 12), (2, 6, 85),\
    (1, 5, 6), (2, 12, 2), (5, 9, 12), (2, 65, 85),\
    (1, 2, 9), (2, 3, 1), (3, 4, 1), (4, 5, 2),\
    (1, 3, 53), (2, 13, 1), (3, 40, 1), (3, 5, 1),],\
   ("id", "timestamp", "value"))

from pyspark.sql.window import Window
# example window specification:
w = Window.partitionBy("id")\
          .orderBy(F.col("timestamp"))\
          .rangeBetween(start=-10, end=Window.currentRow)

df_agg = df.select("id", "timestamp", most_common(F.collect_list(F.col("timestamp")).over(w)).alias("top2_hours"),)

Этот UDF отлично работает на локальном экземпляре Spark на Spyder. Смотрите вывод ниже:

    \+---+---------+--------------------+----------+
    \| id|timestamp|           all_hours|top2_hours|
    \+---+---------+--------------------+----------+
    \|  5|        9|                 [9]|       [9]|
    \|  5|       89|                [89]|      [89]|
    \|  1|        2|              [2, 2]|       [2]|
    \|  1|        2|              [2, 2]|       [2]|
    \|  1|        3|           [2, 2, 3]|    [2, 3]|
    \|  1|        5|     [2, 2, 3, 5, 5]|    [2, 5]|
    \|  1|        5|     [2, 2, 3, 5, 5]|    [2, 5]|
    \|  3|        4|              [4, 4]|       [4]|
    \|  3|        4|              [4, 4]|       [4]|
    \|  3|        5|           [4, 4, 5]|    [4, 5]|
    \|  3|       40|                [40]|      [40]|
    \|  2|        3|           [3, 3, 3]|       [3]|
    \|  2|        3|           [3, 3, 3]|       [3]|
    \|  2|        3|           [3, 3, 3]|       [3]|
    \|  2|        6|        [3, 3, 3, 6]|    [3, 6]|
    \|  2|       12|    [3, 3, 3, 6, 12]|    [3, 6]|
    \|  2|       13|[3, 3, 3, 6, 12, 13]|    [3, 6]|
    \|  2|       65|                [65]|      [65]|
    \|  4|        5|              [5, 5]|       [5]|
    \|  4|        5|              [5, 5]|       [5]|
    \+---+---------+--------------------+----------+

Но я не могу заставить его работать на корпоративном кластере с использованием интерфейса Dataiku. Он возвращает следующую ошибку:

Py4JJavaError: Произошла ошибка при вызове o931.showString. : org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 4 на этапе 20.0 не выполнена 4 раза, последний сбой: потерянное задание 4.3 на этапе 20.0 (TID 62874, enchbcclprcp117.srv.bmogc.net, исполнитель 1): java.io.IOException: Невозможно запустить программу "": error = 2, нет такого файла или каталога

Я бы не хотел использовать для этого udf, если это возможно. Есть ли способ найти n наиболее часто встречающихся элементов в списке (который заполняется через определенное временное окно, разделенное идентификатором, как в коде выше) изначально / без UDF?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...