Как передать массив в пользовательскую функцию агрегирования в Spark (UDAF) - PullRequest
0 голосов
/ 31 мая 2019

Я хотел бы передать массив в качестве входной схемы в UDAF.

Пример, который я привел, довольно прост, он просто суммирует 2 вектора. На самом деле мой вариант использования более сложный, и мне нужно использовать UDAF.

import sc.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions._

val df = Seq(
  (1, Array(10.2, 12.3, 11.2)),
  (1, Array(11.2, 12.6, 10.8)),
  (2, Array(12.1, 11.2, 10.1)),
  (2, Array(10.1, 16.0, 9.3)) 
  ).toDF("siteId", "bidRevenue")


class BidAggregatorBySiteId() extends UserDefinedAggregateFunction {

  def inputSchema: StructType = StructType(Array(StructField("bidRevenue", ArrayType(DoubleType))))

  def bufferSchema = StructType(Array(StructField("sumArray", ArrayType(DoubleType))))

  def dataType: DataType = ArrayType(DoubleType)

  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, Array(0.0, 0.0, 0.0))
      }

  def update(buffer: MutableAggregationBuffer, input: Row) = {
      val seqBuffer = buffer(0).asInstanceOf[IndexedSeq[Double]]
      val seqInput = input(0).asInstanceOf[IndexedSeq[Double]]
      buffer(0) = seqBuffer.zip(seqInput).map{ case (x, y) => x + y }
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
     val seqBuffer1 = buffer1(0).asInstanceOf[IndexedSeq[Double]]
     val seqBuffer2 = buffer2(0).asInstanceOf[IndexedSeq[Double]]
     buffer1(0) = seqBuffer1.zip(seqBuffer2).map{ case (x, y) => x + y }
  }

  def evaluate(buffer: Row) = { 
    buffer
  }
}
val fun = new BidAggregatorBySiteId()

df.select($"siteId", $"bidRevenue" cast(ArrayType(DoubleType)))
.groupBy("siteId").agg(fun($"bidRevenue"))
.show

Все отлично работает для преобразований перед действием "show". Но шоу поднимает ошибку:

scala.MatchError: [WrappedArray (21.4, 24.9, 22.0)] (класса org.apache.spark.sql.execution.aggregate.InputAggregationBuffer) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter.toCatalystImpl (CatalystTypeConverters.scala: 160)

Структура моего фрейма данных:

root
 |-- siteId: integer (nullable = false)
 |-- bidRevenue: array (nullable = true)
 |    |-- element: double (containsNull = true)

df.dtypes = Array [(String, String)] = Array (("siteId", "IntegerType"), ("bidRevenue", "ArrayType (DoubleType, true)"))

Танки для вас ценная помощь.

1 Ответ

0 голосов
/ 02 июня 2019
def evaluate(buffer: Row): Any

Вышеупомянутый метод вызывается после полной обработки группы для получения окончательного результата.Поскольку вы инициализируете и обновляете только 0-й индекс буфера

i.e. buffer(0)  

Таким образом, вам необходимо вернуть 0-е значение индекса в конце, поскольку ваши агрегированные результаты хранятся в 0 индексе.

  def evaluate(buffer: Row) = {
    buffer.get(0)
  }

Вышеуказанная модификация метода valid () приведет к:

// +------+---------------------------------+
// |siteId|bidaggregatorbysiteid(bidRevenue)|
// +------+---------------------------------+
// |     1|               [21.4, 24.9, 22.0]|
// |     2|               [22.2, 27.2, 19.4]|
// +------+---------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...