Как выполнить произвольные вычисления для групп записей в кадре данных Spark? - PullRequest
0 голосов
/ 02 мая 2018

У меня есть такой фрейм данных:

|-----+-----+-------+---------|
| foo | bar | fox   | cow     |
|-----+-----+-------+---------|
|   1 |   2 | red   | blue    | // row 0
|   1 |   2 | red   | yellow  | // row 1
|   2 |   2 | brown | green   | // row 2
|   3 |   4 | taupe | fuschia | // row 3
|   3 |   4 | red   | orange  | // row 4
|-----+-----+-------+---------|

Мне нужно сгруппировать записи по "foo" и "bar", а затем выполнить некоторые магические вычисления для "fox" и "cow", чтобы создать "badger", который может вставлять или удалять записи:

|-----+-----+-------+---------+---------|
| foo | bar | fox   | cow     | badger  |
|-----+-----+-------+---------+---------|
|   1 |   2 | red   | blue    | zebra   |
|   1 |   2 | red   | blue    | chicken |
|   1 |   2 | red   | yellow  | cougar  |
|   2 |   2 | brown | green   | duck    |
|   3 |   4 | red   | orange  | peacock |
|-----+-----+-------+---------+---------|

(В этом примере строка 0 была разделена на два значения «барсука», а строка 3 была удалена из окончательного вывода.)

Мой лучший подход на данный момент выглядит так:

val groups = df.select("foo", "bar").distinct
groups.flatMap(row => {
  val (foo, bar): (String, String) = (row(0), row(1))
  val group: DataFrame = df.where(s"foo == '$foo' AND bar == '$bar'")
  val rowsWithBadgers: List[Row] = makeBadgersFor(group)
  rowsWithBadgers
})

У этого подхода есть несколько проблем:

  1. Неуместно сопоставлять foo и bar по отдельности. (Утилита может это исправить, так что ничего страшного.)
  2. Выдает ошибку Invalid tree: null\nnull из-за вложенной операции, в которой я пытаюсь сослаться на df изнутри groups.flatMap. Пока не знаю, как обойти это.
  3. Я не уверен, действительно ли это отображение и фильтрация эффективно используют вычисления, распределенные Spark.

Есть ли более эффективный и / или элегантный подход к этой проблеме?

Этот вопрос очень похож на Spark DataFrame: работать с группами , но я включаю его здесь, потому что 1) не ясно, требует ли этот вопрос добавление и удаление записей, и 2) ответы в этом вопросе устарели и не хватает деталей.

Я не вижу способа сделать это с помощью groupBy и определяемой пользователем статистической функции , потому что функция агрегирования агрегирует в одну строку. Другими словами,

udf(<records with foo == 'foo' && bar == 'bar'>) => [foo,bar,aggregatedValue]

Мне нужно, возможно, вернуть две или более разных строк или ноль строк после анализа моей группы. Я не вижу способа, чтобы функции агрегации могли это сделать - если у вас есть пример, пожалуйста, поделитесь.

1 Ответ

0 голосов
/ 30 мая 2018

Можно использовать пользовательскую функцию. Одна возвращенная строка может содержать список. Затем вы можете разбить список на несколько строк и восстановить столбцы.

Агрегатор:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders.kryo
import org.apache.spark.sql.expressions.Aggregator

case class StuffIn(foo: BigInt, bar: BigInt, fox: String, cow: String)
case class StuffOut(foo: BigInt, bar: BigInt, fox: String, cow: String, badger: String)
object StuffOut {
  def apply(stuffIn: StuffIn): StuffOut = new StuffOut(stuffIn.foo, 
stuffIn.bar, stuffIn.fox, stuffIn.cow, "dummy")
}

object MultiLineAggregator extends Aggregator[StuffIn, Seq[StuffOut], Seq[StuffOut]] {
  def zero: Seq[StuffOut] = Seq[StuffOut]()
  def reduce(buffer: Seq[StuffOut], stuff: StuffIn): Seq[StuffOut] = {
    makeBadgersForDummy(buffer, stuff)
  }

  def merge(b1: Seq[StuffOut], b2: Seq[StuffOut]): Seq[StuffOut] = {
    b1 ++: b2
  }
  def finish(reduction: Seq[StuffOut]): Seq[StuffOut] = reduction
  def bufferEncoder: Encoder[Seq[StuffOut]] = kryo[Seq[StuffOut]]
  def outputEncoder: Encoder[Seq[StuffOut]] = kryo[Seq[StuffOut]]
}

Звонок:

val averageSalary: TypedColumn[StuffIn, Seq[StuffOut]] = MultiLineAggregator.toColumn

val res: DataFrame =
  ds.groupByKey(x => (x.foo, x.bar))
          .agg(averageSalary)
          .map(_._2)
          .withColumn("value", explode($"value"))
          .withColumn("foo", $"value.foo")
          .withColumn("bar", $"value.bar")
          .withColumn("fox", $"value.fox")
          .withColumn("cow", $"value.cow")
          .withColumn("badger", $"value.badger")
          .drop("value")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...