Тестовый запрос sparksql - PullRequest
0 голосов
/ 12 июня 2018

У меня есть Dataframe, для которого я хочу выполнить простой запрос, например:

def runQuery(df: DataFrame, queryString: String): DataFrame = {

    df.createOrReplaceTempView("myDataFrame")
    spark.sql(queryString)

}

Где queryString может быть что-то вроде

"SELECT name, age FROM myDataFrame WHERE age > 30"

Но я бы очень хотелзаранее знать, будет ли работать запрос без исключения.Например, что если df не имеет столбцов name и age?Я хочу написать что-то вроде этого, чтобы справиться с этим:

def runQuery(df: DataFrame, queryString: String): DataFrame = {

    if (/*** df and queryString are compatible ***/) {
        df.createOrReplaceTempView("myDataFrame")
        spark.sql(queryString)
    } else {
        spark.createDataFrame(sc.emptyRDD[Row], df.schema)
    }

}

Есть ли способ проверить это в операторе 'if'?

Ответы [ 2 ]

0 голосов
/ 13 июня 2018

Вы можете проверить все столбцы, присутствующие в фрейме данных или нет, с помощью запуска искрового задания

  def runQuery(df: DataFrame, queryString: String): DataFrame = 
    if(Array("name", "age", "address").forall(df.columns.contains)) {
      df.createOrReplaceTempView("myDataFrame")
      df.sparkSession.sql(queryString)
    } else {
      df.sparkSession.emptyDataset(RowEncoder(df.schema))
    }

, вы также можете использовать df.schema для соответствия типу данных

0 голосов
/ 12 июня 2018

Я бы не стал сильно беспокоиться об исключениях.Просто оберните это Try:

import scala.util.Try
import org.apache.spark.sql.catalyst.encoders.RowEncoder

def runQuery(df: DataFrame, queryString: String): DataFrame = Try {
  df.createOrReplaceTempView("myDataFrame")
  df.sparkSession.sql(queryString)
}.getOrElse(df.sparkSession.emptyDataset(RowEncoder(df.schema)))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...