Scala / Spark: применить метод к нескольким подмножествам набора данных - PullRequest
0 голосов
/ 23 января 2019

Рассмотрим набор данных со следующей структурой.

case class Order(date: Date, customer: Int, product: Int, quantity: Int)
val data = ...
  .as[Order]

У меня есть метод, который рассчитывает вероятность того, что данный клиент разместит заказ на данный продукт, один период времени в будущем.

//df only contains observations for one customer, and one product
def genPrediction(df: Dataset[Order]): Double = ...

data содержит множество различных комбинаций customer / product.То, что я хочу сделать, это группа data по customer и product, например.val groupedData = data.groupBy("customer", "product") и агрегируйте его, используя genPrediction.Результатом должен быть набор данных со следующей структурой:

case class Prediction(customer: Int, product: Int, probability: Double)

Пишу свой собственный Aggregator способ пойти сюда, или есть более простой подход, который использует мой ранее существовавший метод?Я имею дело с большим количеством данных, поэтому эффективность важна.

Редактировать: Я нашел решение, хотел бы получить некоторую обратную связь о том, является ли это хорошо подход.

val nested: Array[Seq[Order]] = data
  .rdd
  .map(x => (x.customerId + x.productId, x))
  .groupByKey()
  .reduceByKey((x,y) => x ++ y)
  .collect()
  .map(x => x._2.toSeq)

def predictAll(orders: Array[Seq[Order]]): Array[Prediction] = {
  def getPrediction(x: Seq[Order]): Prediction = {
    val df: Dataset[Order] = x.toDF.as[Order]
    Prediction(df.select("week").first.getString(0),
               df.select("customerId").first.getString(0),
               df.select("productId").first.getString(0),
               genPrediction(df))
  }

  @tailrec
  def recPredict(xs: Array[Seq[Order]], acc: Array[Prediction]): 
    Array[Prediction] = xs match {
      case x if x.isEmpty => acc
      case x => recPrediction(x.tail, acc :+ getPrediction(x.head)
  }

  recPrediction(orders, Array[Prediction]())
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...