Я должен работать с userRepository.save(User(1, "test"))
для понимания с IO.Я думаю, что это блокирует работу потока и попытался ContextShift.evalOn
, чтобы сделать код правильно (https://typelevel.org/cats-effect/concurrency/basics.html#blocking-threads)
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val blockingEC = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
def doSth(str: String): IO[Unit] = IO(println(str))
def blockingOp(str: String): IO[Unit] = IO(userRepository.save(User(1, str)))
def greeting(msg: String): IO[Unit] =
for {
_ <- doSth("Before blocking operation.")
_ <- contextShift.evalOn(blockingEC)(blockingOp("testName"))
_ <- doSth("After blocking operation")
} yield ()
val run = greeting("test").unsafeRunAsyncAndForget()
Это используется ExecutionContext.global в этом примере, как этого избежать? И как использовать monixfrom (https://monix.io/docs/3x/best-practices/blocking.html). Я попробовал следующий, но операция сохранения не была выполнена.
private val io = Scheduler.io(name = "engine-io")
def executeBlockingIO[T](cb: => T): Future[T] = {
val p = Promise[T]()
io.execute(new Runnable {
def run() =
try {
p.success(cb)
} catch {
case NonFatal(ex) =>
logger.error(s"Uncaught I/O exception", ex)
p.failure(ex)
}
})
p.future
}
def doSth(str: String): IO[Unit] = IO(println(str))
def blockingOp(str: String): IO[Unit] = IO(userRepository.save(User(1, str)))
def greeting(msg: String): IO[Unit] =
for {
_ <- doSth("Before blocking operation.")
_ <- IO.fromFuture(IO(executeBlockingIO(blockingOp("testName"))))
_ <- doSth("After blocking operation")
} yield ()
val test = greeting("test").unsafeRunAsyncAndForget()