Невозможно распечатать значения Scala Future с помощью onComplete & andThen - PullRequest
0 голосов
/ 27 апреля 2019

Я пытаюсь прочитать инкрементные данные из моего источника данных, используя Scala-Spark.Прежде чем перейти к исходным таблицам, я пытаюсь вычислить минимальный и максимальный столбец раздела, который я использую в своем коде в будущем, который представлен в классе: GetSourceMeta, как указано ниже.

def getBounds(keyIdMap:scala.collection.mutable.Map[String, String]): Future[scala.collection.mutable.Map[String, String]] = Future {
    var boundsMap = scala.collection.mutable.Map[String, String]()
    keyIdMap.keys.foreach(table => if(!keyIdMap(table).contains("Invalid")) {
        val minMax    = s"select max(insert_tms) maxTms, min(insert_tms) minTms from schema.${table} where source='DB2' and key_id in (${keyIdMap(table)})"
        println("MinMax: " + minMax)
        val boundsDF  = spark.read.format("jdbc").option("url", con.getConUrl()).option("dbtable", s"(${minMax}) as ctids").option("user", con.getUserName()).option("password", con.getPwd()).load()
        try {
            val maxTms = boundsDF.select("minTms").head.getTimestamp(0).toString + "," + boundsDF.select("maxTms").head.getTimestamp(0).toString
            println("Bounds: " + maxTms)
            boundsMap += (table -> maxTms)
        } catch {
            case np: java.lang.NullPointerException =>  { println("No data found") }
            case e: Exception => { println(s"Unknown exception: $e") }
        }
    }
    )
    boundsMap.foreach(println)
    boundsMap
}

Я вызываю вышеупомянутый метод в моем основном методе как:

object LoadToCopyDB {
    val conf = new SparkConf().setAppName("TEST_YEAR").set("some parameters")
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
        val gsm = new GetSourceMeta()
        val minMaxKeyMap = gsm.getBounds(keyIdMap).onComplete {
          case Success(values) => values.foreach(println)
          case Failure(f)      => f.printStackTrace
    }
.
.
.
}

Ну, onComplete не печатал никаких значений, поэтому я использовал andThen, как показано ниже, и это тоже не помогло.

val bounds: Future[scala.collection.mutable.Map[String, String]] = gpMetaData.getBounds(incrementalIds) andThen {
  case Success(outval) => outval.foreach(println)
  case Failure(e)        => println(e)
}

Ранее основной поток выходил, не позволяя Future: getBounds исполниться.Следовательно, я не смог найти никаких выводов из будущего, отображаемых на терминале.Я узнал, что мне нужно оставить основной поток в ожидании, чтобы завершить будущее.Но когда я использую Await в main вместе с onComplete:

Await.result(bounds, Duration.Inf)

Компилятор выдает ошибку:

Type mismatch, expected: Awaitable[NotInferedT], actual:Unit

Если я объявляю val minMaxKeyMap как Future[scala.collection.mutable.Map[String, String], компилятор говорит: Expression of type Unit doesn't conform to expected type Future[mutable.map[String,String]]

Я пытался напечатать значения bounds после оператора Await, но он просто печатает пустую карту.

Я не мог понять, как можно это исправить.Может ли кто-нибудь дать мне знать, что мне делать, чтобы «Будущее» работало правильно?

1 Ответ

0 голосов
/ 27 апреля 2019

В таких случаях всегда лучше следовать типам.Метод onComplete возвращает только Unit, он не возвращает будущее, следовательно, его нельзя передать с помощью Await.

В случае, если вы хотите вернуть Future любого типа, который у вас будетнапример, чтобы отобразить или отобразить значение и вернуть опцию.В этом случае не имеет значения, что вы возвращаете, вы только хотите, чтобы метод Await ожидал этого результата и печатал трассировку.Вы можете обработать возможные исключения в выздоровлении.В вашем коде это будет выглядеть так:

val minMaxKeyMap:Future[Option[Any] = gsm.getBounds(keyIdMap).map { values =>
   values.foreach(println)
   None
}.recover{
   case e: Throwable => 
          e. printStackTrace
          None
}

Обратите внимание, что часть восстановления должна возвращать экземпляр типа.После этого вы можете применить Await to the Future и распечатать результаты.Это не самое красивое решение, но оно будет работать в вашем случае.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...