Столбец Apache Spark с собранной информацией и объединением строк - PullRequest
0 голосов
/ 09 ноября 2018

У меня есть следующий DataFrame:

+------+------------------+--------------+-------------+
|  name|             email|         phone|      country|
+------+------------------+--------------+-------------+
|  Mike|  mike@example.com|+91-9999999999|        Italy|
|  Alex|  alex@example.com|+91-9999999998|       France|
|  John|  john@example.com| +1-1111111111|United States|
|Donald|donald@example.com| +1-2222222222|United States|
|   Dan|   dan@example.com|+91-9999444999|       Poland|
| Scott| scott@example.com|+91-9111999998|        Spain|
|   Rob|   rob@example.com|+91-9114444998|        Italy|
+------+------------------+--------------+-------------+

после применения следующего преобразования:

val tags = Map(
  "big" -> "country IN (FROM big_countries)",
  "medium" -> "country IN (FROM medium_countries)",
  // a few thousands of other tag keys and conditions with any possible SQL statements allowed in SQL WHERE clause(users create them on the application UI)
  "sometag" -> "name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'")

def buildTagQuery(tag: String, tagCondition: String, table: String): String = {
    f"FROM $table WHERE $tagCondition"
}

val userTags = tags.map {
  case (tag, tagCondition) => {
    spark.sql(buildTagQuery(tag, tagCondition, "users"))
      .withColumn("tag", lit(tag).cast(StringType))
  }
}

val unionDf = userTags.foldLeft(userTags.head) {
  case (acc, df) => acc.union(df)
}

Я получаю следующий DataFrame:

+------+------------------+--------------+-------------+-------+
|  name|             email|         phone|      country|    tag|
+------+------------------+--------------+-------------+-------+
|  Mike|  mike@example.com|+91-9999999999|        Italy|    big|
|  Alex|  alex@example.com|+91-9999999998|       France|    big|
|  John|  john@example.com| +1-1111111111|United States|    big|
|Donald|donald@example.com| +1-2222222222|United States|    big|
| Scott| scott@example.com|+91-9111999998|        Spain|    big|
|   Rob|   rob@example.com|+91-9114444998|        Italy|    big|
|  Mike|  mike@example.com|+91-9999999999|        Italy|    big|
|  Alex|  alex@example.com|+91-9999999998|       France|    big|
|  John|  john@example.com| +1-1111111111|United States|    big|
|Donald|donald@example.com| +1-2222222222|United States|    big|
| Scott| scott@example.com|+91-9111999998|        Spain|    big|
|   Rob|   rob@example.com|+91-9114444998|        Italy|    big|
|   Dan|   dan@example.com|+91-9999444999|       Poland| medium|
| Scott| scott@example.com|+91-9111999998|        Spain| medium|
|Donald|donald@example.com| +1-2222222222|United States|sometag|
+------+------------------+--------------+-------------+-------+

, который дублирует каждую исходную запись DataFrame с дополнительной информацией в столбце тегов, но мне нужно что-то вроде этого (не дублированные записи из исходного DataFrame и набор тегов в столбце tag):

+------+------------------+--------------+-------------+--------------+
|  name|             email|         phone|      country|           tag|
+------+------------------+--------------+-------------+--------------+
|  Mike|  mike@example.com|+91-9999999999|        Italy|         [big]|
|  Alex|  alex@example.com|+91-9999999998|       France|         [big]|
|  John|  john@example.com| +1-1111111111|United States|         [big]|
|Donald|donald@example.com| +1-2222222222|United States| [big,sometag]|
|   Dan|   dan@example.com|+91-9999444999|       Poland|      [medium]|
| Scott| scott@example.com|+91-9111999998|        Spain|  [big,medium]|
|   Rob|   rob@example.com|+91-9114444998|        Italy|         [big]|
+------+------------------+--------------+-------------+--------------+

Сейчас я не знаю, как изменить свое преобразование, чтобы получить такую ​​структуру со столбцом tag, как ArrayType, без первоначального дублирования строки.

1 Ответ

0 голосов
/ 10 ноября 2018

Вот один из возможных способов, не меняя слишком много логики.

Сначала вам нужно будет назначить уникальный идентификатор для таблицы пользователей.Как показано ниже:

import org.apache.spark.sql.functions._

val userstable = spark.sql("select * from users")

val userswithId = userstable.withColumn("UniqueID", monotonically_increasing_id())

userswithId.createOrReplaceTempView("users")

Теперь ваши tags и userTags остаются такими же, как указано выше.

val tags = Map(
  "big" -> "country IN (FROM big_countries)",
  "medium" -> "country IN (FROM medium_countries)",
  // a few thousands of other tag keys and conditions with any possible SQL statements allowed in SQL WHERE clause(users create them on the application UI)
  "sometag" -> "name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'")

def buildTagQuery(tag: String, tagCondition: String, table: String): String = {
  f"FROM $table WHERE $tagCondition"
}

Здесь мы выбираем только UniqueID и tagколонны.

val userTags = tags.map {
  case (tag, tagCondition) => {
    spark.sql(buildTagQuery(tag, tagCondition, "users"))
      .withColumn("tag", lit(tag).cast(StringType)).select("UniqueID", "tag")
  }
}

Это очень важно.В вашем исходном коде была небольшая ошибка при использовании foldLeft.Заголовок списка был сложен дважды в вашем случае.То, что я сделал здесь, выбрал голову в отдельную переменную, а затем отбросил ее из userTags.Логика фолдинга такая же, как и раньше.Но в этом случае мы не складываем элемент head дважды.

val headhere = userTags.head
val userResults  = userTags.drop(1)
val unionDf2 = userResults.foldLeft(headhere) {
  case (acc, df) => acc.union(df)
}

Теперь мы группируемся по столбцу UniqueID, объединяя tags в его собственный список.

val unionDf3 = unionDf2.groupBy("UniqueID").agg(collect_list("tag"))

println("Printing the unionDf3 result")
unionDf3.show(25)

Наконец, мы объединяем вашу таблицу users с уникальным идентификатором, который мы присвоили ранее (т. Е. Таблица userswithId), с предыдущим кадром данных, чтобы получить окончательный результат.

val finalResult = userswithId.join(unionDf3,"UniqueID")

println("Printing the final result")
finalResult.show(25)

FinalРезультат как ниже:

+--------+------+------------------+--------------+-------------+-----------------+
|UniqueID|  name|             email|         phone|      country|collect_list(tag)|
+--------+------+------------------+--------------+-------------+-----------------+
|       0|  Alex|  alex@example.com|+91-9999999998|       France|            [big]|
|       1|  John|  john@example.com| +1-1111111111|United States|            [big]|
|       2|Donald|donald@example.com| +1-2222222222|United States|   [big, sometag]|
|       4| Scott| scott@example.com|+91-9111999998|        Spain|    [big, medium]|
+--------+------+------------------+--------------+-------------+-----------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...