Как использовать countDistinct с помощью оконной функции в spark / cala? - PullRequest
2 голосов
/ 11 марта 2020

Мне нужно использовать оконную функцию, которая разделена на 2 столбца и выполняет четкий подсчет в 3-м столбце и в качестве 4-го столбца. Я могу считать без каких-либо проблем, но использование отдельного счетчика вызывает исключение -

rg.apache.spark.sql.AnalysisException: Distinct window functions are not supported: 

Есть ли обходной путь для этого?

Ответы [ 3 ]

1 голос
/ 11 марта 2020

В предыдущем ответе предлагалось два возможных метода: приблизительный подсчет и size(collect_set(...)). У обоих есть проблемы.

Если вам нужен точный подсчет, который является основной причиной использования COUNT (DISTINCT ...) в больших данных, приблизительный подсчет не подойдет. Кроме того, приблизительный подсчет фактических коэффициентов ошибок может значительно отличаться для небольших данных.

size(collect_set(...)) может привести к значительному замедлению обработки больших данных, поскольку в нем используется изменяемый Scala HashSet, что довольно медленная структура данных. Кроме того, вы можете иногда получать странные результаты, например, если вы выполняете запрос через пустой фрейм данных, потому что size(null) выдает противоинтуитивное значение -1. Собственный отчетливый подсчет Spark выполняется быстрее по ряду причин, главная из которых заключается в том, что ему не нужно создавать все подсчитанные данные в массиве.

Типичный подход к решению этой проблемы заключается в самостоятельном присоединиться. Вы группируете по любым столбцам, которые вам нужны, вычисляете отдельное число или любую другую статистическую функцию, которую нельзя использовать в качестве оконной функции, а затем присоединяетесь к исходным данным.

1 голос
/ 11 марта 2020

Использование approx_count_distinct (или) collect_set and size функций для окна с функциями mimi c countDistinct.

Example:

df.show()
//+---+---+---+
//|  i|  j|  k|
//+---+---+---+
//|  1|  a|  c|
//|  2|  b|  d|
//|  1|  a|  c|
//|  2|  b|  e|
//+---+---+---+

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val windowSpec = Window.partitionBy("i","j")

df.withColumn("cnt",size(collect_set("k").over(windowSpec))).show()

//or using approx_count_distinct

df.withColumn("cnt",approx_count_distinct("k").over(windowSpec)).show()

//+---+---+---+---+
//|  i|  j|  k|cnt|
//+---+---+---+---+
//|  2|  b|  d|  2|
//|  2|  b|  e|  2|
//|  1|  a|  c|  1| //as c value repeated for 1,a partition
//|  1|  a|  c|  1|
//+---+---+---+---+
0 голосов
/ 22 апреля 2020

Пытаясь улучшить Ответ сима , если вы хотите сделать это:

//val newColumnName: String = ...
//val colToCount: Column = ...
//val aggregatingCols: Seq[Column] = ...

df.withColumn(newColName, countDistinct(colToCount).over(partitionBy(aggregatingCols:_*)))

Вместо этого вы должны сделать это:

//val aggregatingCols: Seq[String] = ...

df.groupBy(aggregatingCols.head, aggregatingCols.tail:_*)
  .agg(countDistinct(colToCount).as(newColName))
  .select(newColName, aggregatingCols:_*)
  .join(df, usingColumns = aggregatingCols)
...