Предикат Spark IN / EXISTS в операторе SELECT - PullRequest
0 голосов
/ 19 ноября 2018

У меня есть следующий тестовый запрос Spark SQL:

Seq("france").toDF.createOrReplaceTempView("countries")
SELECT CASE WHEN country = 'italy' THEN 'Italy' 
    ELSE ( CASE WHEN country IN (FROM countries) THEN upperCase(country) ELSE country END ) 
    END AS country FROM users

, который выдает следующую ошибку:

Exception in thread "main" org.apache.spark.sql.AnalysisException: 
    IN/EXISTS predicate sub-queries can only be used in a Filter

следующая причина запроса CASE WHEN country IN (FROM countries) является причиной этого.

Существует ли обходной путь в Spark SQL для эмуляции country IN (FROM countries) в выбранных условиях? Я заинтересован в реализации чистого SQL, а не в реализации через API.

Ответы [ 2 ]

0 голосов
/ 19 ноября 2018

Вот правильный запрос SQL:

import sparkSession.implicits._

Seq("france").toDF("country").createOrReplaceTempView("countries")
Seq(("user1", "france"), ("user2", "italy"), ("user2", "usa"))
  .toDF("user", "country").createOrReplaceTempView("users")

val query =
  s"""
     |SELECT
     |  CASE
     |    WHEN u.country = 'italy' THEN 'Italy'
     |    ELSE (
     |      CASE
     |        WHEN u.country = c.country THEN upper(u.country)
     |        ELSE u.country
     |      END
     |    ) END AS country
     |FROM users u
     |LEFT JOIN countries c
     |  ON u.country = c.country
  """.stripMargin
sparkSession.sql(query).show()

Результат:

+-------+
|country|
+-------+
| FRANCE|
|  Italy|
|    usa|
+-------+

Причина, по которой вы можете использовать IN/EXISTS операторы sql только в предикатах: логика в проекциях(CASE-WHEN в нашем случае) вычисляется для каждой строки в наборе данных, возвращаемых при выборе.Имея это в виду, не лучшая идея запускать эквивалент CASE WHEN country IN (SELECT * FROM countries) для каждой строки из таблицы users.Таким образом, SQL предотвращает это на уровне языка (механизм синтаксического анализа sql).

0 голосов
/ 19 ноября 2018

В качестве альтернативы вы можете использовать

withColumn ()

и

when ()

функция (из spark.sql.functions ):

val users = Seq(("1", "france"), ("2", "Italy"), ("3", "italy")).toDF("userId", "country")
val countriesList = Seq("france", "italy", "germany").toList

val result = users.withColumn("country", when(col("country") === "italy", "Italy")
  .when(col("country") isin(countriesList:_*), upper(col("country"))).otherwise(col("country")))

result.show()

Результат:

+------+-------+
|userId|country|
+------+-------+
|     1| FRANCE|
|     2|  Italy|
|     3|  Italy|
+------+-------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...