Как передать все оконные значения в pyspark UDF - PullRequest
0 голосов
/ 15 февраля 2019

Я хочу выполнить следующие операции над фреймом данных:

  1. Столбец Groupby
  2. Данные окна
  3. выполнить (udf) пользовательскую операцию с данными окна

Вот пример кода, который я пробовал:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
ss = SparkSession.builder
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col

sparkSession = ss.getOrCreate()

sc = sparkSession.sparkContext
sc.setLogLevel("FATAL")

df = sparkSession.createDataFrame([(17.00, "2018-03-10"),
                                   (13.00, "2018-03-11"),
                                   (25.00, "2018-03-12"),
                                   (20.00, "2018-03-13"),
                                   (17.00, "2018-03-14"),
                                   (99.00, "2018-03-15"),
                                   (156.00, "2018-03-22"),
                                   (17.00, "2018-03-31"),
                                   (25.00, "2018-03-15"),
                                   (25.00, "2018-03-16")
                                   ],
                                  ["id", "ts"])

w = F.window(col("ts").cast("timestamp"), "10 days")
windo = w.alias("window")

@udf(ArrayType(FloatType()))
def new_tuple(x):
    #print(type(x))
    return x

df.groupBy("id", windo).agg(new_tuple(F.collect_list("id"))).show(truncate=False)

Выше код дает мне то, что я хочу.Однако я не уверен насчет метода "collect_list".

Я тоже пробовал UDF для панд.Я получаю ожидаемый результат (см. Ниже), используя панд.Однако метод apply не возвращает столбец окна.

Вопросы :

  1. Работает ли collect_list на рабочем узле или узле драйвера?Этот код может не масштабироваться, если collect_list соберет все результаты в мастер-узел.

  2. Есть ли эффективный способ получить следующий вывод без collect_list?

  3. Я читаю UDF панд эффективны.Однако я не знаю, как передать / вернуть столбец окна назад.

Ожидаемый результат :

+-----+------------------------------------------+---------------------------------+
|id   |window                                    |new_tuple(collect_list(id, 0, 0))|
+-----+------------------------------------------+---------------------------------+
|17.0 |[2018-03-29 19:00:00, 2018-04-08 19:00:00]|[17.0]                           |
|25.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[25.0, 25.0, 25.0]               |
|13.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[13.0]                           |
|99.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[99.0]                           |
|156.0|[2018-03-19 19:00:00, 2018-03-29 19:00:00]|[156.0]                          |
|20.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[20.0]                           |
|17.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[17.0, 17.0]                     |
+-----+------------------------------------------+---------------------------------+

Вопросздесь не дает ответов на мои вопросы.Я применяю оконную операцию к сгруппированным данным.

...