Давайте предположим, что у вас есть датафрейм пользователей. В spark можно создать образец такого фрейма данных, например:
import spark.implicits._
val df = Seq(("me", "contact@me.com"),
("me", "me@company.com"),
("you", "you@company.com")).toDF("user_id", "email")
df.show()
+-------+---------------+
|user_id| email|
+-------+---------------+
| me| contact@me.com|
| me| me@company.com|
| you|you@company.com|
+-------+---------------+
Теперь логика будет очень похожа на ту, что есть в SQL:
df.groupBy("user_id")
.agg(countDistinct("email") as "count")
.where('count > 1)
.show()
+-------+-----+
|user_id|count|
+-------+-----+
| me| 2|
+-------+-----+
Затем вы можете добавить .drop("count")
или .select("user_id")
, чтобы сохранить только пользователей.
Обратите внимание, что в искре нет having
предложения . После того, как вы вызвали agg
для агрегации вашего фрейма данных по пользователю, у вас есть обычный фрейм данных, на котором вы можете вызвать любую функцию преобразования, например, фильтр для столбца count
.