Рассмотрим набор данных со следующей структурой.
case class Order(date: Date, customer: Int, product: Int, quantity: Int)
val data = ...
.as[Order]
У меня есть метод, который рассчитывает вероятность того, что данный клиент разместит заказ на данный продукт, один период времени в будущем.
//df only contains observations for one customer, and one product
def genPrediction(df: Dataset[Order]): Double = ...
data
содержит множество различных комбинаций customer
/ product
.То, что я хочу сделать, это группа data
по customer
и product
, например.val groupedData = data.groupBy("customer", "product")
и агрегируйте его, используя genPrediction
.Результатом должен быть набор данных со следующей структурой:
case class Prediction(customer: Int, product: Int, probability: Double)
Пишу свой собственный Aggregator
способ пойти сюда, или есть более простой подход, который использует мой ранее существовавший метод?Я имею дело с большим количеством данных, поэтому эффективность важна.
Редактировать: Я нашел решение, хотел бы получить некоторую обратную связь о том, является ли это хорошо подход.
val nested: Array[Seq[Order]] = data
.rdd
.map(x => (x.customerId + x.productId, x))
.groupByKey()
.reduceByKey((x,y) => x ++ y)
.collect()
.map(x => x._2.toSeq)
def predictAll(orders: Array[Seq[Order]]): Array[Prediction] = {
def getPrediction(x: Seq[Order]): Prediction = {
val df: Dataset[Order] = x.toDF.as[Order]
Prediction(df.select("week").first.getString(0),
df.select("customerId").first.getString(0),
df.select("productId").first.getString(0),
genPrediction(df))
}
@tailrec
def recPredict(xs: Array[Seq[Order]], acc: Array[Prediction]):
Array[Prediction] = xs match {
case x if x.isEmpty => acc
case x => recPrediction(x.tail, acc :+ getPrediction(x.head)
}
recPrediction(orders, Array[Prediction]())
}