SQL-запрос Apache Spark и DataFrame в качестве справочных данных - PullRequest
0 голосов
/ 21 января 2019

У меня есть два Spark DataFrames:

cities DataFrame со следующим столбцом:

city
-----
London
Austin

bigCities DataFrame со следующим столбцом:

name
------
London
Cairo

Мне нужно преобразовать DataFrame cities и добавить туда дополнительный логический столбец: bigCity Значение этого столбца должно быть рассчитано на основе следующего условия "cities.city IN bigCities.name"

Я могу сделать это следующим образом (со статической коллекцией bigCities):

cities.createOrReplaceTempView("cities")

var resultDf = spark.sql("SELECT city, CASE WHEN city IN ['London', 'Cairo'] THEN 'Y' ELSE 'N' END AS bigCity FROM cities")

но я не знаю, как заменить статическую коллекцию bigCities ['London', 'Cairo'] на bigCities DataFrame в запросе. Я хочу использовать bigCities в качестве справочных данных в запросе.

Посоветуйте, пожалуйста, как этого добиться.

Ответы [ 2 ]

0 голосов
/ 21 января 2019

Вы можете использовать collect_list () в таблице bigCities.Проверьте это

scala> val df_city = Seq(("London"),("Austin")).toDF("city")
df_city: org.apache.spark.sql.DataFrame = [city: string]

scala> val df_bigCities = Seq(("London"),("Cairo")).toDF("name")
df_bigCities: org.apache.spark.sql.DataFrame = [name: string]

scala> df_city.createOrReplaceTempView("cities")

scala> df_bigCities.createOrReplaceTempView("bigCities")

scala> spark.sql(" select city, case when array_contains((select collect_list(name) from bigcities),city) then 'Y' else 'N' end as bigCity from cities").show(false)
+------+-------+
|city  |bigCity|
+------+-------+
|London|Y      |
|Austin|N      |
+------+-------+


scala>

Если набор данных большой, вы можете использовать collect_set, который будет более эффективным.

scala> spark.sql(" select city, case when array_contains((select collect_set(name) from bigcities),city) then 'Y' else 'N' end as bigCity from cities").show(false)
+------+-------+
|city  |bigCity|
+------+-------+
|London|Y      |
|Austin|N      |
+------+-------+


scala>
0 голосов
/ 21 января 2019
val df = cities.join(bigCities, $"name".equalTo($"city"), "leftouter").
                withColumn("bigCity", when($"name".isNull, "N").otherwise("Y")).
                drop("name")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...