Понимание эффекта кошек `Cancelable` - PullRequest
1 голос
/ 01 мая 2020

Я пытаюсь понять, как работает эффект кошек Cancelable. У меня есть следующее минимальное приложение, основанное на документации

import java.util.concurrent.{Executors, ScheduledExecutorService}
import cats.effect._
import cats.implicits._
import scala.concurrent.duration._

object Main extends IOApp {
  def delayedTick(d: FiniteDuration)
                 (implicit sc: ScheduledExecutorService): IO[Unit] = {

    IO.cancelable { cb =>
      val r = new Runnable {
        def run() =
          cb(Right(()))
      }
      val f = sc.schedule(r, d.length, d.unit)

      // Returning the cancellation token needed to cancel
      // the scheduling and release resources early
      val mayInterruptIfRunning = false
      IO(f.cancel(mayInterruptIfRunning)).void
    }
  }

  override def run(args: List[String]): IO[ExitCode] = {
    val scheduledExecutorService =
      Executors.newSingleThreadScheduledExecutor()
    for {
      x <- delayedTick(1.second)(scheduledExecutorService)
      _ <- IO(println(s"$x"))
    } yield ExitCode.Success
  }
}

Когда я запускаю это:

❯ sbt run
[info] Loading global plugins from /Users/ethan/.sbt/1.0/plugins
[info] Loading settings for project stackoverflow-build from plugins.sbt ...
[info] Loading project definition from /Users/ethan/IdeaProjects/stackoverflow/project
[info] Loading settings for project stackoverflow from build.sbt ...
[info] Set current project to cats-effect-tutorial (in build file:/Users/ethan/IdeaProjects/stackoverflow/)
[info] Compiling 1 Scala source to /Users/ethan/IdeaProjects/stackoverflow/target/scala-2.12/classes ...
[info] running (fork) Main
[info] ()

Программа просто зависает в этот момент. У меня много вопросов:

  1. Почему программа зависает вместо завершения через 1 секунду?
  2. Почему мы устанавливаем mayInterruptIfRunning = false? Разве не весь смысл отмены для прерывания выполняющейся задачи?
  3. Является ли это рекомендуемым способом определения ScheduledExecutorService? Я не видел примеров в документации.
  4. Эта программа ждет 1 секунду, а затем возвращает () (затем неожиданно зависает). Что если я захочу вернуть что-нибудь еще? Например, скажем, я хотел вернуть строку, результат некоторых длительных вычислений. Как мне извлечь это значение из IO.cancelable? Кажется, сложность в том, что IO.cancelable возвращает операцию отмены, а не возвращаемое значение процесса, который должен быть отменен.

Простите длинный пост, но это мое build.sbt:

name := "cats-effect-tutorial"

version := "1.0"

fork := true

scalaVersion := "2.12.8"

libraryDependencies += "org.typelevel" %% "cats-effect" % "1.3.0" withSources() withJavadoc()

scalacOptions ++= Seq(
  "-feature",
  "-deprecation",
  "-unchecked",
  "-language:postfixOps",
  "-language:higherKinds",
  "-Ypartial-unification")

1 Ответ

0 голосов
/ 02 мая 2020

Мне удалось найти ответ на эти вопросы, хотя есть некоторые вещи, которые я не понимаю.

Почему программа зависает вместо завершения через 1 секунду?

По какой-то причине Executors.newSingleThreadScheduledExecutor() вызывает зависание. Чтобы решить проблему, мне пришлось использовать Executors.newSingleThreadScheduledExecutor(new Thread(_)). Похоже, единственное отличие состоит в том, что первая версия эквивалентна Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), хотя ничто в документах не объясняет, почему это так.

Почему мы установить mayInterruptIfRunning = false? Разве весь смысл отмены заключается в том, чтобы прервать выполнение задачи?

Я должен признать, что не совсем понимаю это. Опять же, документы не особо проясняли этот вопрос. Переключение флага на true, похоже, не меняет поведения вообще, по крайней мере, в случае прерываний Ctrl-c.

Является ли это рекомендуемым способом определения ScheduledExecutorService? Я не видел примеров в документах.

Очевидно, нет. Способ, который я придумал, был слабо вдохновлен этим фрагментом из исходного кода эффекта кошки.

Эта программа ждет 1 секунду, а затем возвращает () (затем неожиданно виснет). Что если я захочу вернуть что-нибудь еще? Например, скажем, я хотел вернуть строку, результат некоторых длительных вычислений. Как мне извлечь это значение из IO.cancelable? Кажется, сложность заключается в том, что IO.cancelable возвращает операцию отмены, а не возвращаемое значение процесса, подлежащего отмене.

Блок IO.cancellable { ... } возвращает IO[A] и обратный вызов cb функция имеет тип Either[Throwable, A] => Unit. Логически это говорит о том, что все, что вводится в функцию cb, - это то, что будет возвращено выражением IO.cancellable (в IO). Таким образом, чтобы вернуть строку "hello" вместо (), мы переписываем delayedTick:

  def delayedTick(d: FiniteDuration)
                 (implicit sc: ScheduledExecutorService): IO[String] = { // Note IO[String] instead of IO[Unit]

    implicit val processRunner: JVMProcessRunner[IO] = new JVMProcessRunner
    IO.cancelable[String] { cb => // Note IO.cancelable[String] instead of IO[Unit]
      val r = new Runnable {
        def run() =
          cb(Right("hello")) // Note "hello" instead of ()
      }
      val f: ScheduledFuture[_] = sc.schedule(r, d.length, d.unit)
      IO(f.cancel(true))
    }
  }
...