несоответствие типов;найдено: Требуется блок: массив [org.apache.spark.sql.Dataset [org.apache.spark.sql.Row]] - PullRequest
0 голосов
/ 15 мая 2018

Почему следующий код имеет ошибку компиляции в операторе возврата,

  def getData(queries: Array[String]): Dataset[Row] = {
    val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props).registerTempTable("")
    return res
  }

Error

type mismatch; found : Unit required: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]

Версия Scala 2.11.11

Версия Spark 2.0.0

EDIT: Актуальный случай

  def getDataFrames(queries: Array[String]) = {
    val jdbcResult = queries.map(query => {
      val tablename = extractTableName(query)
      if (tablename.contains("1")) {
        spark.sqlContext.read.format("jdbc").jdbc(jdbcUrl1, query, props)
      } else {
        spark.sqlContext.read.format("jdbc").jdbc(jdbcUrl2, query, props)
      }
    })
  }

Здесь я хочу вернуть объединенный вывод из итерации, например, Array [Dataset [Row]] или Array [DataFrame] (но Dataframe недоступен в 2.0.0 в качестве зависимости). Приведенный выше код делает магию? или как мне это сделать?

Ответы [ 2 ]

0 голосов
/ 15 мая 2018

Вы можете вернуть list из dataframes как List[Dataframe]

def getData(queries: Array[String]): List[Dataframe] = {
  val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props)
  //create multiple dataframe from your queries
  val df1 = ???
  val df2 = ???
  val list = List(df1, df2)
  //You can create a list dynamically with list of quries 
  list
}

registerTempTable возвращает Unit вам лучше удалить registerTempTable и вернуть Dataframe и вернуть list данных.

UPDATE

Вот как вы можете вернуть список фреймов данных со списком запросов

def getDataFrames(queries: Array[String]): Array[DataFrame] = {
  val jdbcResult = queries.map(query => {
    val tablename = extractTableName(query)
    val dataframe = if (tablename.contains("1")) {
      spark.sqlContext.read.format("jdbc").jdbc("", query, prop)
    } else {
      spark.sqlContext.read.format("jdbc").jdbc("", query, prop)
    }
    dataframe
  })
  jdbcResult
}

Надеюсь, это поможет!

0 голосов
/ 15 мая 2018

Из сообщения об ошибке ясно, что в вашей функции имеется несоответствие типов.registerTempTable() api создает таблицу в памяти, ограниченную текущим сеансом, и остается доступной до тех пор, пока SparkSession не станет активным.

Проверьте тип возвращаемого значения registerTempTable () api

измените свой код на следующее, чтобы удалить сообщение об ошибке:

def getData(queries: Array[String]): Unit = {
    val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props).registerTempTable("")

  }

еще лучше было бы написать код следующим образом:

val tempName: String = "Name_Of_Temp_View" spark.read.format("jdbc").jdbc(jdbcUrl, "", props).createOrReplaceTempView(tempName)

Используйте createOrReplaceTempView() , поскольку registerTempTable() устарело, поскольку Spark 2.0.0

Альтернативное решение согласно вашему требованию:

def getData(queries: Array[String], spark: SparkSession): Array[DataFrame] = { spark.read.format("jdbc").jdbc(jdbcUrl, "", props).createOrReplaceTempView("Name_Of_Temp_Table") val result: Array[DataFrame] = queries.map(query => spark.sql(query)) result }

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...