Остановить длительный процесс на ресурсе - PullRequest
0 голосов
/ 30 октября 2019

У меня есть ресурс, который выполняет потенциально длительную операцию блокировки. В реальном коде это подпрограмма ZeroMQ, которая ожидает следующего сообщения (не слишком отличается от примера кода здесь ), но в этом примере я создал фиктивный цикл, который завершается только тогда, когда ресурсзакрыто. Проблема в том, что если я прерву выполнение с помощью ctrl+c, я получу исключение, цикл продолжит работать, и close будет вызван только во второй раз, когда я нажму ctrl+c. Я предполагаю, что то же самое произойдет, если приложение завершится ненормальным образом.

Это определение ресурса:

class DummyLoop extends Closeable {

    var running = true

    def run() = {
      while (running) {
        println("sleeping")
        Thread.sleep(1000)
      }
    }

    override def close(): Unit = {
      println("closing...")
      running = false
    }
  }

  def dummyLoop(): Resource[IO, DummyLoop] =
    Resource.fromAutoCloseable(IO(new DummyLoop))

Затем из main я использую его так:

  def run(args: List[String]): IO[ExitCode] = {
    dummyLoop()
      .use {
        dl =>
          for {
            _ <- IO(dl.run())
          } yield ExitCode.Success
      }
  }

Я ожидаю, что метод close будет вызываться при прерывании выполнения. Вместо этого я получаю это:

...
# {----------------HERE THE PROGRAMME STARTS TO RUN NORMALLY}
[info] running TestApp 
sleeping
sleeping
sleeping
sleeping
sleeping
sleeping
sleeping
# {----------------HERE I HIT THE FIRST CTRL+C}
^C
[warn] Canceling execution...
[error] Total time: 9 s, completed 30 Oct 2019, 18:11:50
[warn] Run canceled.
[error] (run-main-0) java.lang.InterruptedException
Exception in thread "sbt-bg-threads-1" java.util.concurrent.RejectedExecutionException
        at java.base/java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1889)
        at java.base/java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:1930)
        at java.base/java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2462)
        at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:24)
        at sbt.internal.BackgroundThreadPool$BackgroundRunnable.$anonfun$cleanup$1(DefaultBackgroundJobService.scala:390)
        at sbt.internal.BackgroundThreadPool$BackgroundRunnable.$anonfun$cleanup$1$adapted(DefaultBackgroundJobService.scala:389)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at sbt.internal.BackgroundThreadPool$BackgroundRunnable.cleanup(DefaultBackgroundJobService.scala:389)
        at sbt.internal.BackgroundThreadPool$BackgroundRunnable.run(DefaultBackgroundJobService.scala:359)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:830)
[error] java.lang.InterruptedException
[error]         at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1040)
[error]         at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
[error]         at cats.effect.internals.IOPlatform$.$anonfun$unsafeResync$2(IOPlatform.scala:50)
[error]         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[error]         at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
[error]         at scala.concurrent.package$.blocking(package.scala:146)
[error]         at cats.effect.internals.IOPlatform$.unsafeResync(IOPlatform.scala:50)
[error]         at cats.effect.IO.unsafeRunTimed(IO.scala:325)
[error]         at cats.effect.IO.unsafeRunSync(IO.scala:240)
[error]         at cats.effect.internals.IOAppPlatform$.main(IOAppPlatform.scala:24)
[error]         at cats.effect.IOApp.main(IOApp.scala:67)
[error]         at cats.effect.IOApp.main$(IOApp.scala:66)
[error]         at io.tokenanalyst.bitcoinrpc.TestApp$.main(TestApp.scala:25)
[error]         at io.tokenanalyst.bitcoinrpc.TestApp.main(TestApp.scala)
[error]         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]         at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]         at java.base/java.lang.reflect.Method.invoke(Method.java:567)
[error] stack trace is suppressed; run last Compile / bgRun for the full output
# {-----------THE LOOP KEEPS RUNNING}
sleeping
sleeping
sleeping
# {-----------WHEN I HIT THE SECOND CTRL+C, CLOSE IS CALLED AND THE PROGRAMME FINALLY TERMINATES}
^C[warn] Thread[shutdownHook1,5,run-main-group-0] loading cats.effect.internals.CancelUtils$ after test or run has completed. This is a likely resource leak
closing...

Как правильно обращаться с этими случаями? Что я делаю неправильно? Спасибо!

1 Ответ

0 голосов
/ 30 октября 2019

Оказалось, что обновление до cats v 2.0.0 решило проблему.

...