Эффективно рассчитать топ-k элементов в искре - PullRequest
1 голос
/ 23 мая 2019

У меня есть датафрейм, аналогичный:

+---+-----+-----+
|key|thing|value|
+---+-----+-----+
| u1|  foo|    1|
| u1|  foo|    2|
| u1|  bar|   10|
| u2|  foo|   10|
| u2|  foo|    2|
| u2|  bar|   10|
+---+-----+-----+

И хотите получить результат:

+---+-----+---------+----+
|key|thing|sum_value|rank|
+---+-----+---------+----+
| u1|  bar|       10|   1|
| u1|  foo|        3|   2|
| u2|  foo|       12|   1|
| u2|  bar|       10|   2|
+---+-----+---------+----+

В настоящее время существует код, аналогичный:

val df = Seq(("u1", "foo", 1), ("u1", "foo", 2), ("u1", "bar", 10), ("u2", "foo", 10), ("u2", "foo", 2), ("u2", "bar", 10)).toDF("key", "thing", "value")

 // calculate sums per key and thing
 val aggregated = df.groupBy("key", "thing").agg(sum("value").alias("sum_value"))

 // get topk items per key
 val k = lit(10)
 val topk = aggregated.withColumn("rank", rank over  Window.partitionBy("key").orderBy(desc("sum_value"))).filter('rank < k)

Однако этот код очень неэффективен . Оконная функция генерирует общий порядок предметов и вызывает гигантский случайный случай .

Как я могу более эффективно рассчитывать топ-k предметов? Возможно, используя приблизительные функции, то есть наброски аналогично https://datasketches.github.io/ или https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html

Ответы [ 2 ]

1 голос
/ 23 июля 2019

Это классический алгоритм рекомендательных систем.

case class Rating(thing: String, score: Int) extends Ordered[Rating] {
  def compare(that: Rating): Int = -this.score.compare(that.score)
}

case class Recommendation(key: Int, ratings: Seq[Rating]) {
  def keep(n: Int) = this.copy(ratings = ratings.sorted.take(n))
}

val TOPK = 10

df.groupBy('key)
  .agg(collect_list(struct('thing, 'value)) as "ratings")
  .as[Recommendation]
  .map(_.keep(TOPK))

Вы также можете проверить исходный код по адресу:

  • Spotify Big Data Rosetta Code / TopItemsPerUser.scala, несколько решений для Spark или Scio
  • Spark MLLib / TopByKeyAggregator.scala, считающийся лучшей практикой при использовании их алгоритма рекомендации, похоже, что в их примерах все еще используется RDD.
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._

sc.parallelize(Array(("u1", ("foo", 1)), ("u1", ("foo", 2)), ("u1", ("bar", 10)), ("u2", ("foo", 10)),
  ("u2", ("foo", 2)), ("u2", ("bar", 10))))
  .topByKey(10)(Ordering.by(_._2))

0 голосов
/ 23 мая 2019

RDD`s на помощь

aggregated.as[(String, String, Long)].rdd.groupBy(_._1).map{ case (thing, it) => (thing, it.map(e=> (e._2, e._3)).toList.sortBy(sorter => sorter._2).take(1))}.toDF.show
+---+----------+
| _1|        _2|
+---+----------+
| u1| [[foo,3]]|
| u2|[[bar,10]]|
+---+----------+

Скорее всего, это можно улучшить, воспользовавшись предложением из комментария. То есть когда не начинается с aggregated, а скорее df. Это может выглядеть примерно так:

df.as[(String, String, Long)].rdd.groupBy(_._1).map{case (thing, it) => {
      val aggregatedInner = it.groupBy(e=> (e._2)).mapValues(events=> events.map(value => value._3).sum)
      val topk = aggregatedInner.toArray.sortBy(sorter=> sorter._2).take(1)
      (thing, topk)
    }}.toDF.show
...