ParMapN, который заканчивается, даже когда одна программа встречает ошибку - PullRequest
0 голосов
/ 13 марта 2019

Используя parMapN, можно параллельно выполнить несколько IO с, например:

import cats.implicits._
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) })
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })

val program = (ioA, ioB, ioC).parMapN { (_, _, _) => () }

program.unsafeRunSync()

Пример вывода:

A1
C1
B1
A2
C2
B2
A3
C3
B3
A4
C4
B4
A5
B5
C5
A6
B6
C6
A7
B7
C7
A8
...

Согласно документации , незавершенные задачи отменяются, если любой из IO s завершается с ошибкой.Каков наилучший способ изменить этот механизм, чтобы все IO все равно заканчивали?

В моем случае некоторые из IO ничего не возвращают (IO[Unit]), и я все еще хочу убедиться, что все работает до тех пор, пока не завершится или не обнаружит ошибку.

1 Ответ

1 голос
/ 13 марта 2019

Ну, я нашел один из возможных ответов вскоре после публикации моего вопроса. Не уверен, что это лучший способ справиться с этим, но определение моих IO как мне подходит:

val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) }).attempt
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) }).attempt
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) }).attempt
...