Я пытаюсь прочитать инкрементные данные из моего источника данных, используя Scala-Spark.Прежде чем перейти к исходным таблицам, я пытаюсь вычислить минимальный и максимальный столбец раздела, который я использую в своем коде в будущем, который представлен в классе: GetSourceMeta
, как указано ниже.
def getBounds(keyIdMap:scala.collection.mutable.Map[String, String]): Future[scala.collection.mutable.Map[String, String]] = Future {
var boundsMap = scala.collection.mutable.Map[String, String]()
keyIdMap.keys.foreach(table => if(!keyIdMap(table).contains("Invalid")) {
val minMax = s"select max(insert_tms) maxTms, min(insert_tms) minTms from schema.${table} where source='DB2' and key_id in (${keyIdMap(table)})"
println("MinMax: " + minMax)
val boundsDF = spark.read.format("jdbc").option("url", con.getConUrl()).option("dbtable", s"(${minMax}) as ctids").option("user", con.getUserName()).option("password", con.getPwd()).load()
try {
val maxTms = boundsDF.select("minTms").head.getTimestamp(0).toString + "," + boundsDF.select("maxTms").head.getTimestamp(0).toString
println("Bounds: " + maxTms)
boundsMap += (table -> maxTms)
} catch {
case np: java.lang.NullPointerException => { println("No data found") }
case e: Exception => { println(s"Unknown exception: $e") }
}
}
)
boundsMap.foreach(println)
boundsMap
}
Я вызываю вышеупомянутый метод в моем основном методе как:
object LoadToCopyDB {
val conf = new SparkConf().setAppName("TEST_YEAR").set("some parameters")
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
val gsm = new GetSourceMeta()
val minMaxKeyMap = gsm.getBounds(keyIdMap).onComplete {
case Success(values) => values.foreach(println)
case Failure(f) => f.printStackTrace
}
.
.
.
}
Ну, onComplete
не печатал никаких значений, поэтому я использовал andThen
, как показано ниже, и это тоже не помогло.
val bounds: Future[scala.collection.mutable.Map[String, String]] = gpMetaData.getBounds(incrementalIds) andThen {
case Success(outval) => outval.foreach(println)
case Failure(e) => println(e)
}
Ранее основной поток выходил, не позволяя Future: getBounds исполниться.Следовательно, я не смог найти никаких выводов из будущего, отображаемых на терминале.Я узнал, что мне нужно оставить основной поток в ожидании, чтобы завершить будущее.Но когда я использую Await в main вместе с onComplete:
Await.result(bounds, Duration.Inf)
Компилятор выдает ошибку:
Type mismatch, expected: Awaitable[NotInferedT], actual:Unit
Если я объявляю val minMaxKeyMap как Future[scala.collection.mutable.Map[String, String]
, компилятор говорит: Expression of type Unit doesn't conform to expected type Future[mutable.map[String,String]]
Я пытался напечатать значения bounds
после оператора Await, но он просто печатает пустую карту.
Я не мог понять, как можно это исправить.Может ли кто-нибудь дать мне знать, что мне делать, чтобы «Будущее» работало правильно?