Вот один из возможных способов, не меняя слишком много логики.
Сначала вам нужно будет назначить уникальный идентификатор для таблицы пользователей.Как показано ниже:
import org.apache.spark.sql.functions._
val userstable = spark.sql("select * from users")
val userswithId = userstable.withColumn("UniqueID", monotonically_increasing_id())
userswithId.createOrReplaceTempView("users")
Теперь ваши tags
и userTags
остаются такими же, как указано выше.
val tags = Map(
"big" -> "country IN (FROM big_countries)",
"medium" -> "country IN (FROM medium_countries)",
// a few thousands of other tag keys and conditions with any possible SQL statements allowed in SQL WHERE clause(users create them on the application UI)
"sometag" -> "name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'")
def buildTagQuery(tag: String, tagCondition: String, table: String): String = {
f"FROM $table WHERE $tagCondition"
}
Здесь мы выбираем только UniqueID
и tag
колонны.
val userTags = tags.map {
case (tag, tagCondition) => {
spark.sql(buildTagQuery(tag, tagCondition, "users"))
.withColumn("tag", lit(tag).cast(StringType)).select("UniqueID", "tag")
}
}
Это очень важно.В вашем исходном коде была небольшая ошибка при использовании foldLeft.Заголовок списка был сложен дважды в вашем случае.То, что я сделал здесь, выбрал голову в отдельную переменную, а затем отбросил ее из userTags
.Логика фолдинга такая же, как и раньше.Но в этом случае мы не складываем элемент head дважды.
val headhere = userTags.head
val userResults = userTags.drop(1)
val unionDf2 = userResults.foldLeft(headhere) {
case (acc, df) => acc.union(df)
}
Теперь мы группируемся по столбцу UniqueID
, объединяя tags
в его собственный список.
val unionDf3 = unionDf2.groupBy("UniqueID").agg(collect_list("tag"))
println("Printing the unionDf3 result")
unionDf3.show(25)
Наконец, мы объединяем вашу таблицу users
с уникальным идентификатором, который мы присвоили ранее (т. Е. Таблица userswithId
), с предыдущим кадром данных, чтобы получить окончательный результат.
val finalResult = userswithId.join(unionDf3,"UniqueID")
println("Printing the final result")
finalResult.show(25)
FinalРезультат как ниже:
+--------+------+------------------+--------------+-------------+-----------------+
|UniqueID| name| email| phone| country|collect_list(tag)|
+--------+------+------------------+--------------+-------------+-----------------+
| 0| Alex| alex@example.com|+91-9999999998| France| [big]|
| 1| John| john@example.com| +1-1111111111|United States| [big]|
| 2|Donald|donald@example.com| +1-2222222222|United States| [big, sometag]|
| 4| Scott| scott@example.com|+91-9111999998| Spain| [big, medium]|
+--------+------+------------------+--------------+-------------+-----------------+