Спарк функции высшего порядка для вычисления лучших N продуктов из списка через запятую - PullRequest
0 голосов
/ 04 октября 2019

Я использую Spark 2.4, и у меня есть искровой фрейм данных, который имеет 2 столбца - id и product_list. Данные состоят из списка продуктов, с которыми взаимодействовал каждый id.

вот пример кода -

scala> spark.version
res3: String = 2.4.3

val df = Seq(
("1", "p1,p1,p1,p1,p1,p3,p3,p2,p2,p2,p2"),
("2", "p2,p2,p2,p2,p2,p4,p4,p4,p1,p3")
).toDF("id", "product_list")
df.createOrReplaceTempView("df")


+---+--------------------------------+
|id |product_list                    |
+---+--------------------------------+
|1  |p1,p1,p1,p1,p1,p3,p3,p2,p2,p2,p2|
|2  |p2,p2,p2,p2,p2,p4,p4,p4,p1,p3   |
+---+--------------------------------+

Я хотел бы вернуть те top 2 продукты, которые каждый id взаимодействовал с. Например, id = 1 просмотрел продукты p1 - 5 times и p2 - 4 times, поэтому я хотел бы вернуть p1,p2 для id = 1. Точно так же, p2,p4 для id = 2.

Мой окончательный вывод должен выглядеть следующим образом:

id, most_seen_products
1, p1,p2
2, p2,p4

Поскольку я использую Spark 2.4, мне было интересно, есть ли функция более высокого порядка, которая сначала преобразовывает этот список в массив, а затем возвращает верхнюю2 просмотренных товаров. В целом код должен обрабатывать top N продуктов.

Ответы [ 3 ]

1 голос
/ 04 октября 2019

Вот мой подход

  val df = Seq(
      ("1", "p1,p1,p1,p1,p1,p3,p3,p2,p2,p2,p2"),
      ("2", "p2,p2,p2,p2,p2,p4,p4,p4,p1,p3")
    ).toDF("id", "product_list")


 def getMetrics(value: Row, n: Int): (String, String) = {

    val split = value.getAs[String]("product_list").split(",")

    val sortedRecords = split.groupBy(x => x).map(data => (data._1, data._2.size)).toList.sortWith(_._2 > _._2)
    (value.getAs[String]("id"), sortedRecords.take(n).map(_._1).mkString(","))

  }


   df.map(value =>
      getMetrics(value, 2)
    ).withColumnRenamed("_1", "id").withColumnRenamed("_2", "most_seen_products") show (false)

Результат

+---+------------------+
|id |most_seen_products|
+---+------------------+
|1  |p1,p2             |
|2  |p2,p4             |
+---+------------------+
0 голосов
/ 04 октября 2019
scala> import org.apache.spark.sql.expressions.UserDefinedFunction
scala> import scala.collection.immutable.ListMap
scala> def max_products:UserDefinedFunction = udf((product:String) => {
val productList = product.split(",").toList
val finalList = ListMap(productList.groupBy(i=>i).mapValues(_.size).toSeq.sortWith(_._2 > _._2):_*).keys.toList
finalList(0) + "," + finalList(1)
})

scala> df.withColumn("most_seen_products", max_products(col("product_list"))).show(false)
+---+--------------------------------+------------------+
|id |product_list                    |most_seen_products|
+---+--------------------------------+------------------+
|1  |p1,p1,p1,p1,p1,p3,p3,p2,p2,p2,p2|p1,p2             |
|2  |p2,p2,p2,p2,p2,p4,p4,p4,p1,p3   |p2,p4             |
+---+--------------------------------+------------------+
0 голосов
/ 04 октября 2019

Глядя на ваш формат данных, вы можете просто использовать .map () или, в случае SQL, UDF, который преобразует все строки. Функция будет:

productList => {
    // list of products = split productList by comma
    // add all items to a String/Count map
    // sort the map, get first 2 elements
    // return string.join of those 2 elements
}
...