Как ссылаться на пользовательскую переменную коллекции в SQL Spark DataFrame - PullRequest
0 голосов
/ 09 ноября 2018

Мне нужно разрешить пользователям определять разные именованные коллекции, которые они могут использовать во время последней конструкции Spark DataFrame SQL.

Я планировал использовать для этой цели широковещательные переменные Spark, но исходя из следующего вопроса SO Как ссылаться на широковещательную переменную в Spark DataFrameSQL похоже, что это невозможно

Допустим, как пользователь, я создал следующую коллекцию через пользовательский интерфейс приложения:

name: countries_dict
values: Seq("Italy", "France", "United States", "Poland", "Spain")

В другом пользовательском интерфейсе приложения (давайте изменим страницу), как пользователь, я создал следующий SQL-запрос Spark:

SELECT name, phone, country FROM users

и я хотел бы отфильтровать записи по SELECT name, phone, country FROM users WHERE countries in countries_dict

Так, например, сейчас я могу создать нечто подобное следующим образом:

val countriesDict = Seq("Italy", "France", "United States", "Poland", "Spain")

val inDict = (s: String) => {
  countriesDict.contains(s)
}

spark.udf.register("in_dict", inDict)

и затем:

SELECT name, phone, country FROM users WHERE in_dict(country)

но самая большая проблема этого подхода в том, что countriesDict жестко закодирован в коде и не создается динамически на основе пользовательского ввода в пользовательском интерфейсе.

Можно ли как-то расширить этот подход для поддержки динамически создаваемых коллекций (пользователями) с именами и элементами через пользовательский интерфейс приложения?

Ответы [ 2 ]

0 голосов
/ 09 ноября 2018

Здесь не имеет смысла использовать переменные трансляции. Даже если игнорировать структурные проблемы, стоимость вызова udf, вероятно, превысит преимущества вещания (особенно при такой небольшой структуре).

Либо встроенный запрос, если данных мало (используйте вашу любимую библиотеку обработки SQL, чтобы избежать риска внедрения SQL):

SELECT name, phone, country FROM users 
WHERE country IN ('Italy', 'France', 'United States', 'Poland', 'Spain')

или просто преобразовать ввод в DataFrame:

countriesDict.toDF("country").createOrReplaceTempView("countries")

и используйте ANTI JOIN, либо в зависимости от порогового значения широковещания, чтобы автоматически преобразовать это в широковещательное соединение, если данные достаточно малы

SELECT * 
FROM users LEFT ANTI JOIN countries 
ON users.country = countries.country

или с явной подсказкой вещания

SELECT  /*+ MAPJOIN(countries) */  * 
FROM users LEFT ANTI JOIN countries 
ON users.country = countries.country

Наконец, вы можете пропустить часть SQL и использовать DataFrame API либо с isin:

spark.table("users").where($"country" isin (countriesDict: _*))

или если у вас действительно есть логика, требующая UDF:

import org.apache.spark.sql.functions.typedLit

val f = udf((x: String, xs: Seq[String]) => { xs.contains(x) })

spark.table("users").where(f($"country", typedLit(countriesDict)))
0 голосов
/ 09 ноября 2018

Конечно, я не знаю об интерфейсе вашего приложения и т. Д., Но есть ли что-то, что говорит против превращения коллекций в кадры данных? Конечно, вы не можете использовать синтаксис WHERE countries in countries_dict, но вам придется использовать соединение. Но Spark автоматически выполнит объединения как широковещательную рассылку, когда объединенный информационный фрейм будет ниже определенного порога, например как описано в Освоение Apache Spark

Вам просто понадобится какое-то хранилище, где пользователи могут хранить содержимое этих небольших фреймов данных, например, в виде файлов CSV.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...