Из сообщения об ошибке ясно, что в вашей функции имеется несоответствие типов.registerTempTable()
api создает таблицу в памяти, ограниченную текущим сеансом, и остается доступной до тех пор, пока SparkSession не станет активным.
Проверьте тип возвращаемого значения registerTempTable () api
измените свой код на следующее, чтобы удалить сообщение об ошибке:
def getData(queries: Array[String]): Unit = {
val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props).registerTempTable("")
}
еще лучше было бы написать код следующим образом:
val tempName: String = "Name_Of_Temp_View"
spark.read.format("jdbc").jdbc(jdbcUrl, "", props).createOrReplaceTempView(tempName)
Используйте createOrReplaceTempView()
, поскольку registerTempTable()
устарело, поскольку Spark 2.0.0
Альтернативное решение согласно вашему требованию:
def getData(queries: Array[String], spark: SparkSession): Array[DataFrame] = {
spark.read.format("jdbc").jdbc(jdbcUrl, "", props).createOrReplaceTempView("Name_Of_Temp_Table")
val result: Array[DataFrame] = queries.map(query => spark.sql(query))
result }