давайте начнем с того, чего я достиг sh. Допустим, у меня есть 10 postgres баз данных, и я хочу их импортировать. Итак, я постараюсь сделать это в разных темах. В каждом потоке я буду использовать функцию Spark DataFrameReader jdb c для создания фрейма данных. Каждый поток возвращает Future.
Try{
spark.read.jdbc(url, sqlQuery, properties)
} match {
case Failure(e) =>
logger.error(s"Failed", e)
Failure(new RuntimeException(s"Failed", e))
case Success(t) => Success(t)
}
Теперь проблема заключается в том, что, когда происходит тайм-аут в ожидании окончания фьючерса до 1016 *, искра не закрывает или не отменяет запрос на postgres.
implicit lazy val xc: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(pool)
val tasks: Iterable[Future[Try[Unit]]] = ...
try {
val results = Await.result(Future.sequence(tasks), Duration(20, TimeUnit.SECONDS))
if (results.exists(_.isFailure))
throw new RuntimeException("Failed")
} finally {
try {
sc.cancelAllJobs()
xc.shutdown()
xc.shutdownNow()
if (!xc.awaitTermination(20, TimeUnit.SECONDS)) {
printf("could not wait ")
}
} catch {
case e: Throwable => printf("niente")
}
}
Я попытался отменить все задания контекста спарк, завершив работу ExecutionContextExecutorService, но запрос продолжает работать в postgres до завершения. Я бы ожидал, что Спарк отменит его.
Я также попытался отладить его и могу подтвердить, что поток прерывается при входе в предложение case Failure(e) =>
.
Есть идеи?