Накопительный продукт в Spark? - PullRequest
0 голосов
/ 25 октября 2018

Я пытаюсь внедрить накопительный продукт в Spark Scala, но я действительно не знаю, как это сделать.У меня есть следующий фрейм данных:

Input data:
+--+--+--------+----+
|A |B | date   | val|
+--+--+--------+----+
|rr|gg|20171103| 2  |
|hh|jj|20171103| 3  |
|rr|gg|20171104| 4  |
|hh|jj|20171104| 5  |
|rr|gg|20171105| 6  |
|hh|jj|20171105| 7  |
+-------+------+----+

И я хотел бы получить следующий вывод

Output data:
+--+--+--------+-----+
|A |B | date   | val |
+--+--+--------+-----+
|rr|gg|20171105| 48  | // 2 * 4 * 6
|hh|jj|20171105| 105 | // 3 * 5 * 7
+-------+------+-----+

Если у вас есть какие-либо идеи о том, как это сделать, это было бы очень полезно:)

Большое спасибо

Ответы [ 2 ]

0 голосов
/ 25 октября 2018

Вы можете решить это, используя collect_list + UDF или UDAF.UDAF может быть более эффективным, но сложнее в реализации из-за локальной агрегации.

Если у вас есть такой кадр данных:

+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  b|  4|
|  b|  5|
+---+---+

Вы можете вызвать UDF:

val prod = udf((vals:Seq[Int]) => vals.reduce(_ * _))

df
  .groupBy($"key")
  .agg(prod(collect_list($"val")).as("val"))
  .show()

+---+---+
|key|val|
+---+---+
|  b| 20|
|  a|  6|
+---+---+
0 голосов
/ 25 октября 2018

До тех пор, пока число строго положительно (0 также может быть обработано, если присутствует, используя coalesce), как в вашем примере, самое простое решение - вычислить сумму логарифмов и взять экспоненту:

import org.apache.spark.sql.functions.{exp, log, max, sum}

val df = Seq(
  ("rr", "gg", "20171103", 2), ("hh", "jj", "20171103", 3), 
  ("rr", "gg", "20171104", 4), ("hh", "jj", "20171104", 5), 
  ("rr", "gg", "20171105", 6), ("hh", "jj", "20171105", 7)
).toDF("A", "B", "date", "val")

val result = df
  .groupBy("A", "B")
  .agg(
    max($"date").as("date"), 
    exp(sum(log($"val"))).as("val"))

Поскольку здесь используется арифметика FP, результат не будет точным:

result.show
+---+---+--------+------------------+
|  A|  B|    date|               val|
+---+---+--------+------------------+
| hh| jj|20171105|104.99999999999997|
| rr| gg|20171105|47.999999999999986|
+---+---+--------+------------------+

, но после округления должен быть достаточным для большинства приложений.

result.withColumn("val", round($"val")).show
+---+---+--------+-----+
|  A|  B|    date|  val|
+---+---+--------+-----+
| hh| jj|20171105|105.0|
| rr| gg|20171105| 48.0|
+---+---+--------+-----+

Если этого недостаточно, вы можете определить UserDefinedAggregateFunction или Aggregator ( Как определить и использовать определяемую пользователем статистическую функцию в Spark SQL? ) или использовать функциональный API с reduceGroups:

import scala.math.Ordering

case class Record(A: String, B: String, date: String, value: Long)

df.withColumnRenamed("val", "value").as[Record]
  .groupByKey(x => (x.A, x.B))
  .reduceGroups((x, y) => x.copy(
    date = Ordering[String].max(x.date, y.date),
    value = x.value * y.value))
  .toDF("key", "value")
  .select($"value.*")
  .show
+---+---+--------+-----+
|  A|  B|    date|value|
+---+---+--------+-----+
| hh| jj|20171105|  105|
| rr| gg|20171105|   48|
+---+---+--------+-----+
...