То, что вы видите, скорее всего, связано с планировщиком моникса и его стремлением к справедливости. Это довольно сложный топи c, но документация и скалярные документы превосходны (см .: https://monix.io/docs/3x/execution/scheduler.html#execution -модель )
Если у вас только один поток (или несколько), это займет некоторое время пока задание "охранник" не получит еще один ход для проверки. С Task.gather
вы запускаете 100 задач одновременно, поэтому планировщик очень занят, и «охранник» не может проверить снова, пока другие задачи уже не будут выполнены. Если у вас есть один поток на задачу, планировщик не может гарантировать честность, и поэтому «сторож» несправедливо проверяет гораздо чаще и может завершить sh раньше.
Если вы используете Task.sequence
, эти 100 задач выполняются последовательно, именно поэтому задание «охранник» дает гораздо больше возможностей для завершения sh при необходимости. Если вы хотите сохранить свой код таким, какой он есть, вы можете использовать Task.gatherN(parallelism = 4)
, что ограничит параллелизм и, следовательно, позволит вашему «охраннику» чаще проверять (промежуточный план между Task.sequence
и Task.gather
).
Мне кажется, что это немного похоже на Go код (с использованием Task.race
, как Go 'select
), и вы также используете неограниченные побочные эффекты, что еще больше усложняет понимание происходящего. Я пытался переписать вашу программу так, чтобы она была более идиоматичной c, и для сложного параллелизма я обычно использую такие потоки, как Observable
:
import cats.effect.concurrent.Ref
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import scala.concurrent.duration._
object ErrorThresholdDemo extends App {
//import monix.execution.Scheduler.Implicits.global
implicit val s: Scheduler = Scheduler.fixedPool("race", 2) // pool size
val taskSize = 100
val threshold = 30
val program = for {
errCounter <- Ref[Task].of(0)
tasks = (1 to taskSize).map(n => Task.sleep(100.millis).flatMap(_ => errCounter.update(_ + (n % 2))))
tasksFinishedCount <- Observable
.fromIterable(tasks)
.mapParallelUnordered(parallelism = 4) { task =>
task
}
.takeUntilEval(errCounter.get.restartUntil(_ >= threshold))
.map(_ => 1)
.sumL
errorCount <- errCounter.get
_ <- Task(println(f"completed tasks: $tasksFinishedCount, errors: $errorCount"))
} yield ()
program.runSyncUnsafe()
}
Как видите, я больше не использую глобальный изменяемый побочные эффекты, но вместо этого Ref
, который также использует Atomic
, но предоставляет функциональный API, который мы можем использовать с Task
. В демонстрационных целях я также изменил порог на 30, и только каждая другая задача будет «ошибкой». Таким образом, ожидаемый результат всегда равен completed tasks: 60, errors: 30
независимо от размера пула потоков.
Я все еще использую опрос с errCounter.get.restartUntil(_ >= threshold)
, который может потребовать слишком много ресурсов процессора на мой вкус, но он близок к вашему оригинальная идея и работает хорошо.
Обычно я не создаю список задач заранее, а вместо этого добавляю входные данные в Observable и создаю задачи внутри .mapParallelUnordered
. Этот код хранит ваш список, поэтому реальное отображение не используется (в нем уже есть задачи).
Вы можете выбрать желаемый параллелизм так же, как с Task.gatherN
, что довольно неплохо, imo.
Дайте мне знать, если что-то все еще неясно:)