По какой-то причине не удается восстановить из RejectedExecutionException - PullRequest
0 голосов
/ 06 декабря 2018

Итак, я пытаюсь написать задачу, которая продолжает работать до тех пор, пока вы не скажете ей остановиться:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import java.util.concurrent.RejectedExecutionException

def runUntilShutdown(f: => Unit) = {
  val ctx = ExecutionContext.fromExecutorService(null)
  import ExecutionContext.global
  def runTask(): Future[Unit] = Future(f)(ctx)
    .flatMap(_ => runTask())(ctx)
  runTask()
    .recover { case _: RejectedExecutionException => () }(global)
    .onComplete { _ => println("Done") }(global)
  ctx
}

val ctx = runUntilShutdown(Thread.sleep(1000))
ctx.shutdown

Я хочу, чтобы в конце я просто напечатал «Готово», но этого никогда не происходит.

Вместо этого трассировка стека для RejectedExecutionException выводится в stderr:

java.util.concurrent.RejectedExecutionException
at scala.concurrent.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1870)
at scala.concurrent.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
at scala.concurrent.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2973)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
at scala.concurrent.forkjoin.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1361)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Идеи?

Ответы [ 2 ]

0 голосов
/ 07 декабря 2018

Эта проблема была исправлена ​​для Scala 2.13 * с новой реализацией Future & Promise, вы можете попробовать свой пример на Scala 2.13.0-M5, но вам придется вызвать shutdownNow на вашем ECв противном случае он просто будет продолжать работать, поскольку он не будет принимать новые задачи, но уже запускает ваше будущее.

Пример вывода:

Welcome to Scala 2.13.0-20181205-121558-76b34c4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144).
Type in expressions for evaluation. Or try :help.

scala> import scala.concurrent.Future
import scala.concurrent.Future

scala> import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext

scala> import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.RejectedExecutionException

scala> def runUntilShutdown(f: => Unit) = {
     |   val ctx = ExecutionContext.fromExecutorService(null)
     |   import ExecutionContext.global
     |   def runTask(): Future[Unit] = Future(f)(ctx)
     |     .flatMap(_ => runTask())(ctx)
     |   runTask()
     |     .recover { case _: RejectedExecutionException => () }(global)
     |     .onComplete { _ => println("Done") }(global)
     |   ctx
     | }
runUntilShutdown: (f: => Unit)scala.concurrent.ExecutionContextExecutorService

scala> val ctx = runUntilShutdown(Thread.sleep(1000))
ctx: scala.concurrent.ExecutionContextExecutorService = scala.concurrent.impl.ExecutionContextImpl$$anon$3@23d060c2[Running, parallelism = 8, size = 1, active = 1, running = 0, steals = 0, tasks = 0, submissions = 0]

scala> ctx.shutdownNow
res2: java.util.List[Runnable] = []

scala> Done

*: https://github.com/scala/bug/issues/9071 (Этобыло невозможно реализовать правильное поведение со старой реализацией Future & Promise, поэтому в настоящее время не запланировано никакого обратного порта для версии 2.12.)

0 голосов
/ 06 декабря 2018

Похоже, у вас есть рекурсия def runTask(): Future[Unit] = Future(f)(ctx).flatMap(_ => runTask())(ctx), которая никогда не закончится.Таким образом, вывод Done никогда не печатается, потому что будущее никогда не завершается

В этом примере правильно обрабатывается исключение

  import scala.concurrent.Future
  import scala.concurrent.ExecutionContext
  import java.util.concurrent.RejectedExecutionException
  import java.util.concurrent.Executors

  def runUntilShutdown(f: () => Unit) = {
    implicit val ctx = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
    Future { f() }
      .recover { case _: RejectedExecutionException => () }
      .onComplete { _ =>
        println("Done")
      }
    ctx
  }

  val ctx = runUntilShutdown { () =>
    Thread.sleep(10000)
  }
  ctx.shutdown()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...