PySpark: объединить агрегатные и оконные функции - PullRequest
0 голосов
/ 09 января 2020

Я работаю с устаревшим кодом Spark SQL следующим образом:

SELECT
  column1,
  max(column2),
  first_value(column3),
  last_value(column4)
FROM
  tableA
GROUP BY
 column1
ORDER BY
 columnN

Я переписываю его в PySpark, как показано ниже

df.groupBy(column1).agg(max(column2), first(column3), last(column4)).orderBy(columnN)

Когда я сравниваю два результаты Я вижу различия в полях, сгенерированных функциями first_value / first и last_value / last.

Они ведут себя недетерминированно c при использовании вне оконных функций? Можно ли объединять агрегаты groupBy с оконными функциями?

Ответы [ 2 ]

1 голос
/ 09 января 2020

Такое поведение возможно, когда у вас широкая таблица, и вы не указываете порядок для оставшихся столбцов. Под капотом происходит то, что искра занимает строку first() или last(), в зависимости от того, что ей доступно в качестве первой строки соответствия условий в куче. Spark SQL и pyspark могут обращаться к разным элементам, поскольку для остальных столбцов порядок не указан.

С точки зрения функции Window вы можете использовать partitionBy(f.col('column_name')) в вашем Window, который работает как groupBy - он группирует данные по столбцу разделения. Однако, не указывая порядок для всех столбцов, вы можете столкнуться с той же проблемой недетерминированности. Надеюсь, это поможет!

Для полноты картины, я рекомендую вам взглянуть на pyspark do c для функций first() и last() здесь: https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark. sql .functions.first

В частности, следующая заметка проливает свет на то, почему ваше поведение было недетерминированным c:

Примечание Функция недетерминирована c, поскольку ее результаты зависят от порядка строк, которые могут быть недетерминированными c после перестановки.

0 голосов
/ 10 января 2020

Определенно!

import pyspark.sql.functions as F

partition = Window.partitionBy("column1").orderBy("columnN")

data = data.withColumn("max_col2", F.max(F.col("column2")).over(partition))\
           .withColumn("first_col3", F.first(F.col("column3")).over(partition))\
           .withColumn("last_col4", F.last(F.col("column4")).over(partition))

data.show(10, False)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...