как получить доступ и объединить несколько типов DataFrame of Future в scala - PullRequest
0 голосов
/ 22 января 2020

У меня есть искра scala приложение. Я пытаюсь использовать внутри него Futures, чтобы распараллелить несколько независимых наборов операций. Я называю их в Futures, и они возвращают мне DataFrame типа Future. Как я могу объединить их в конце и выдать ошибку, если какое-либо из Future не сможет вычислить. Ниже мой код. Когда я пытаюсь применить объединение Dataframe в блоке onComplete, он говорит об этой ошибке

value union is not a member of scala.concurrent.Future[(scala.concurrent.Future[org.apache.spark.sql.DataFrame], scala.concurrent.Future[org.apache.spark.sql.DataFrame], scala.concurrent.Future[org.apache.spark.sql.DataFrame])]..

Любая помощь будет высоко оценена Спасибо

val future_session = Future{ ProcessSession(df, spark) }
val future_links =  Future{ ProcessSession(df, spark) }
val future_nodes = Future { ProcessSession(df, spark) }

val result = for {
      r1 <- future_session
      r2 <- future_links
      r3 <- future_nodes
} yield ( 
   r1,r2,r3
)

result.onComplete {          
    case Success(x) => {
      log.info("Execution completed")       
    }
    case Failure(e) => e.printStackTrace
}

1 Ответ

0 голосов
/ 22 января 2020

Похоже, что ProcessSession.apply само по себе приводит к Future[DataFrame] В зависимости от того, сколько работы выполнено, прежде чем вы получите Future, вы захотите:

  • если много работы (таким образом, параллельный вызов ProcessSession.apply полезен для дополнительных затрат на создание обещания и планирование задачи в контексте выполнения), вы можете использовать метод .flatten в Future[Future[T]] для удаления один слой "futurity":

    for {
      r1 <- future_session.flatten
      r2 <- future_links.flatten
      r3 <- future_nodes.flatten
    } // and so forth
    
  • Если ProcessSession.apply не очень много, прежде чем вернуть Future, тогда просто замените блоки Future на raw ProcessSession звонки:

    val future_session = ProcessSession(df, spark)
    val future_links = ProcessSession(df, spark) // Not sure what you really wanted here, but I'm going with what was in the code you posted
    val future_nodes = ProcessSession(df, spark)
    
...