Spark SQL: используя collect_set над значениями массива? - PullRequest
0 голосов
/ 10 февраля 2019

У меня есть агрегированный DataFrame со столбцом, созданным с использованием collect_set.Теперь мне нужно снова выполнить агрегирование по этому DataFrame и снова применить collect_set к значениям этого столбца.Проблема в том, что мне нужно применить collect_Set к значениям наборов - и пока я вижу, как это сделать, только взорвав агрегированный DataFrame.Есть ли лучший способ?

Пример:

Исходный фрейм данных:

country   | continent   | attributes
-------------------------------------
Canada    | America     | A
Belgium   | Europe      | Z
USA       | America     | A
Canada    | America     | B
France    | Europe      | Y
France    | Europe      | X

Агрегированный фрейм данных (тот, который я получаю в качестве входных данных) - агрегацияболее country:

country   | continent   | attributes
-------------------------------------
Canada    | America     | A, B
Belgium   | Europe      | Z
USA       | America     | A
France    | Europe      | Y, X

Мой желаемый результат - агрегация по continent:

continent   | attributes
-------------------------------------
America     | A, B
Europe      | X, Y, Z

1 Ответ

0 голосов
/ 10 февраля 2019

Поскольку в этот момент вы можете иметь только несколько строк, вы просто собираете атрибуты как есть и выравниваете результат (Spark> = 2,4)

import org.apache.spark.sql.functions.{collect_set, flatten, array_distinct}

val byState = Seq(
  ("Canada", "America", Seq("A", "B")),
  ("Belgium", "Europe", Seq("Z")),
  ("USA", "America", Seq("A")),
  ("France", "Europe", Seq("Y", "X"))
).toDF("country", "continent", "attributes")

byState
  .groupBy("continent")
  .agg(array_distinct(flatten(collect_set($"attributes"))) as "attributes")
  .show
+---------+----------+
|continent|attributes|
+---------+----------+
|   Europe| [Y, X, Z]|
|  America|    [A, B]|
+---------+----------+

В общем случае всеГораздо сложнее обрабатывать, и во многих случаях, если вы ожидаете больших списков, с большим количеством дубликатов и множеством значений в группе, оптимальным решением * будет просто пересчитать результаты с нуля, то есть

input.groupBy($"continent").agg(collect_set($"attributes") as "attributes")

Одна из возможных альтернативдолжен использовать Aggregator

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
import scala.collection.mutable.{Set => MSet}


class MergeSets[T, U](f: T => Seq[U])(implicit enc: Encoder[Seq[U]]) extends 
     Aggregator[T, MSet[U], Seq[U]] with Serializable {

  def zero = MSet.empty[U]

  def reduce(acc: MSet[U], x: T) = {
    for { v <- f(x) } acc.add(v)
    acc
  }

  def merge(acc1: MSet[U], acc2: MSet[U]) = {
    acc1 ++= acc2
  }

  def finish(acc: MSet[U]) = acc.toSeq
  def bufferEncoder: Encoder[MSet[U]] = Encoders.kryo[MSet[U]]
  def outputEncoder: Encoder[Seq[U]] = enc

}

и применять его следующим образом

case class CountryAggregate(
  country: String, continent: String, attributes: Seq[String])

byState
  .as[CountryAggregate]
  .groupByKey(_.continent)
  .agg(new MergeSets[CountryAggregate, String](_.attributes).toColumn)
  .toDF("continent", "attributes")
  .show
+---------+----------+
|continent|attributes|
+---------+----------+
|   Europe| [X, Y, Z]|
|  America|    [B, A]|
+---------+----------+

, но это явно не дружественный для Java вариант.

См. также Как объединить значения в коллекцию после groupBy? (аналогично, но без ограничения уникальности).


* Это потому, что explode может быть довольно дорогим, особенно в старых версиях Spark,такой же, как доступ к внешнему представлению коллекций SQL.

...