Поиск всех пар пользователей, которые имеют определенное количество общих значений - PullRequest
0 голосов
/ 29 октября 2018

Я новичок в спарке и пытаюсь найти конкретную информацию о паре списков данных, которые я преобразовал в два отдельных фрейма данных.

Два кадра данных:

Users:                item_Details:
user_id | item_id     item_id | item_name
-----------------     ----------------------
  1     | 123           123   |  phone
  2     | 223           223   |  game
  3     | 423           423   |  foo
  2     | 1223          1223  |  bar
  1     | 3213          3213  | foobar

Мне нужно найти все пары пользователей, которые имеют более 50 общих элементов и отсортированы по количеству элементов. Не должно быть дубликатов, означающих, что должен быть только один набор userId 1 и userId 2.

Результат должен выглядеть так:

user_id1 | user_id2 | count_of_items | list_of_items
-------------------------------------------------------------
    1    |     2    |       51       |  phone,foo,bar,foobar

Ответы [ 2 ]

0 голосов
/ 29 октября 2018

Другое решение, без использования UDF. Поскольку нам нужны общие элементы, сопоставление может быть дано в самом joinExprs. Проверьте это

val users = Seq(
  (1, 123), (1, 223), (1, 423),
  (2, 123), (2, 423), (2, 1223), (2, 3213),
  (3, 223), (3, 423), (3, 1223), (3, 3213),
  (4, 123), (4, 1223), (4, 3213)
).toDF("user_id", "item_id")

val items = Seq(
  (123, "phone"), (223, "game"), (423, "foo"), (1223, "bar"), (3213, "foobar")
)toDF("item_id", "item_name")

val common_items =
  users.as("t1").join(users.as("t2"),$"t1.user_id" < $"t2.user_id" and $"t1.item_id" === $"t2.item_id" )
      .join(items.as("it"),$"t1.item_id"===$"it.item_id","inner")
      .groupBy($"t1.user_id",$"t2.user_id")
      .agg(collect_set('item_name).as("items"))
      .filter(size('items)>2) // change here for count
      .withColumn("size",size('items))

common_items.show(false)

Результаты

+-------+-------+--------------------+----+
|user_id|user_id|items               |size|
+-------+-------+--------------------+----+
|2      |3      |[bar, foo, foobar]  |3   |
|2      |4      |[bar, foobar, phone]|3   |
+-------+-------+--------------------+----+
0 голосов
/ 29 октября 2018

Вот один из подходов:

  1. сборка item pairs за отдельную пару пользователей через самостоятельное соединение
  2. генерирует common items из item pairs с использованием UDF
  3. отфильтровать результирующий набор данных по общему количеству элементов

как показано ниже:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

val users = Seq(
  (1, 123), (1, 223), (1, 423),
  (2, 123), (2, 423), (2, 1223), (2, 3213),
  (3, 223), (3, 423), (3, 1223), (3, 3213),
  (4, 123), (4, 1223), (4, 3213)
).toDF("user_id", "item_id")

val item_details = Seq(
  (123, "phone"), (223, "game"), (423, "foo"), (1223, "bar"), (3213, "foobar")
)toDF("item_id", "item_name")

val commonItems = udf( (itemPairs: Seq[Row]) =>
  itemPairs.collect{ case Row(a: Int, b: Int) if a == b => a }
)

val commonLimit = 2  // Replace this with any specific common item count

val user_common_items =
  users.as("u1").join(users.as("u2"), $"u1.user_id" < $"u2.user_id").
  groupBy($"u1.user_id", $"u2.user_id").agg(
    collect_set(
      struct($"u1.item_id".as("ui1"), $"u2.item_id".as("ui2"))
    ).as("item_pairs")).
  withColumn("common_items", commonItems($"item_pairs")).
  drop("item_pairs").
  where(size($"common_items") > commonLimit)

user_common_items.show(false)
// +-------+-------+-----------------+
// |user_id|user_id|common_items     |
// +-------+-------+-----------------+
// |2      |3      |[423, 3213, 1223]|
// |2      |4      |[3213, 123, 1223]|
// +-------+-------+-----------------+

Если требуются общие имена элементов вместо идентификаторов элементов, вы можете присоединиться к item_details на вышеприведенном шаге для агрегирования по именам элементов; или вы можете взорвать существующий common item ids и присоединиться к item_details вместе с агрегацией collect_list по паре пользователей:

user_common_items.
  withColumn("item_id", explode($"common_items")).
  join(item_details, Seq("item_id")).
  groupBy($"u1.user_id", $"u2.user_id").agg(collect_list($"item_name").as("common_items")).
  withColumn("item_count", size($"common_items")).
  show
// +-------+-------+--------------------+----------+
// |user_id|user_id|        common_items|item_count|
// +-------+-------+--------------------+----------+
// |      2|      3|  [foo, foobar, bar]|         3|
// |      2|      4|[foobar, phone, bar]|         3|
// +-------+-------+--------------------+----------+
...