Вот один из подходов:
- сборка
item pairs
за отдельную пару пользователей через самостоятельное соединение
- генерирует
common items
из item pairs
с использованием UDF
- отфильтровать результирующий набор данных по общему количеству элементов
как показано ниже:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
val users = Seq(
(1, 123), (1, 223), (1, 423),
(2, 123), (2, 423), (2, 1223), (2, 3213),
(3, 223), (3, 423), (3, 1223), (3, 3213),
(4, 123), (4, 1223), (4, 3213)
).toDF("user_id", "item_id")
val item_details = Seq(
(123, "phone"), (223, "game"), (423, "foo"), (1223, "bar"), (3213, "foobar")
)toDF("item_id", "item_name")
val commonItems = udf( (itemPairs: Seq[Row]) =>
itemPairs.collect{ case Row(a: Int, b: Int) if a == b => a }
)
val commonLimit = 2 // Replace this with any specific common item count
val user_common_items =
users.as("u1").join(users.as("u2"), $"u1.user_id" < $"u2.user_id").
groupBy($"u1.user_id", $"u2.user_id").agg(
collect_set(
struct($"u1.item_id".as("ui1"), $"u2.item_id".as("ui2"))
).as("item_pairs")).
withColumn("common_items", commonItems($"item_pairs")).
drop("item_pairs").
where(size($"common_items") > commonLimit)
user_common_items.show(false)
// +-------+-------+-----------------+
// |user_id|user_id|common_items |
// +-------+-------+-----------------+
// |2 |3 |[423, 3213, 1223]|
// |2 |4 |[3213, 123, 1223]|
// +-------+-------+-----------------+
Если требуются общие имена элементов вместо идентификаторов элементов, вы можете присоединиться к item_details
на вышеприведенном шаге для агрегирования по именам элементов; или вы можете взорвать существующий common item ids
и присоединиться к item_details
вместе с агрегацией collect_list
по паре пользователей:
user_common_items.
withColumn("item_id", explode($"common_items")).
join(item_details, Seq("item_id")).
groupBy($"u1.user_id", $"u2.user_id").agg(collect_list($"item_name").as("common_items")).
withColumn("item_count", size($"common_items")).
show
// +-------+-------+--------------------+----------+
// |user_id|user_id| common_items|item_count|
// +-------+-------+--------------------+----------+
// | 2| 3| [foo, foobar, bar]| 3|
// | 2| 4|[foobar, phone, bar]| 3|
// +-------+-------+--------------------+----------+