остановить все asyn c Задача, когда они отказывают через порог? - PullRequest
3 голосов
/ 16 февраля 2020

Я использую Monix Task для асинхронного управления c.

сценарий

  1. задачи выполняются в Параллельно
  2. если сбой происходит в X раз
  3. остановка всех задач, которые еще не находятся в полном состоянии (как можно быстрее)

мое решение

Я выдвигаю идеи о том, что гонка между 1. результатом и 2. счетчиком ошибок, и отменяю проигравшего.
Через Task.race, если счетчик ошибок сначала достигнет порога, тогда задачи будут отменены Task.race.

эксперимент

на Аммонит REPL

{
  import $ivy.`io.monix::monix:3.1.0`
  import monix.eval.Task
  import monix.execution.atomic.Atomic
  import scala.concurrent.duration._
  import monix.execution.Scheduler
  //import monix.execution.Scheduler.Implicits.global
  implicit val s = Scheduler.fixedPool("race", 2) // pool size

  val taskSize = 100
  val errCounter = Atomic(0)
  val threshold = 3

  val tasks = (1 to taskSize).map(_ => Task.sleep(100.millis).map(_ => errCounter.increment()))
  val guard = Task(f"stop because too many error: ${errCounter.get()}")
    .restartUntil(_ => errCounter.get() >= threshold)

  val race = Task
    .race(guard, Task.gather(tasks))
    .runToFuture
    .onComplete { case x => println(x); println(f"completed task: ${errCounter.get()}") }
}

выпуск

результат зависит от размера пула потоков!?

Для размера пула 1
результат почти всегда является успешной задачей, то есть без остановки.

Success(Right(.........))
completed task: 100 // all task success !

Для размера пула 2
очень не определено c между успехом и неудачей, и отмена не является точной. например:

Success(Left(stop because too many error: 1))
completed task: 98

отмена выполняется до 98 задач.
количество ошибок странно мало до порогового значения.

Глобальный планировщик по умолчанию получает такое же поведение результата.

Для размера пула 200
это более детерминировано c, и остановка происходит раньше, поэтому более точна в том смысле, что меньше задач было выполнено.

Success(Left(stop because too many error: 2))
completed task: 8

чем больше размер пула, тем лучше.


Если я изменю Task.gather на Task.sequence выполнение, все проблемы исчезнут !


В чем причина этой зависимости от размера пула? Как его улучшить или есть лучшая альтернатива для остановки задач, когда возникает слишком много ошибок?

1 Ответ

1 голос
/ 18 февраля 2020

То, что вы видите, скорее всего, связано с планировщиком моникса и его стремлением к справедливости. Это довольно сложный топи c, но документация и скалярные документы превосходны (см .: https://monix.io/docs/3x/execution/scheduler.html#execution -модель )

Если у вас только один поток (или несколько), это займет некоторое время пока задание "охранник" не получит еще один ход для проверки. С Task.gather вы запускаете 100 задач одновременно, поэтому планировщик очень занят, и «охранник» не может проверить снова, пока другие задачи уже не будут выполнены. Если у вас есть один поток на задачу, планировщик не может гарантировать честность, и поэтому «сторож» несправедливо проверяет гораздо чаще и может завершить sh раньше.

Если вы используете Task.sequence, эти 100 задач выполняются последовательно, именно поэтому задание «охранник» дает гораздо больше возможностей для завершения sh при необходимости. Если вы хотите сохранить свой код таким, какой он есть, вы можете использовать Task.gatherN(parallelism = 4), что ограничит параллелизм и, следовательно, позволит вашему «охраннику» чаще проверять (промежуточный план между Task.sequence и Task.gather).

Мне кажется, что это немного похоже на Go код (с использованием Task.race, как Go 'select), и вы также используете неограниченные побочные эффекты, что еще больше усложняет понимание происходящего. Я пытался переписать вашу программу так, чтобы она была более идиоматичной c, и для сложного параллелизма я обычно использую такие потоки, как Observable:

import cats.effect.concurrent.Ref
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable

import scala.concurrent.duration._

object ErrorThresholdDemo extends App {

  //import monix.execution.Scheduler.Implicits.global
  implicit val s: Scheduler = Scheduler.fixedPool("race", 2) // pool size

  val taskSize  = 100
  val threshold = 30

  val program = for {
    errCounter <- Ref[Task].of(0)

    tasks = (1 to taskSize).map(n => Task.sleep(100.millis).flatMap(_ => errCounter.update(_ + (n % 2))))

    tasksFinishedCount <- Observable
      .fromIterable(tasks)
      .mapParallelUnordered(parallelism = 4) { task =>
        task
      }
      .takeUntilEval(errCounter.get.restartUntil(_ >= threshold))
      .map(_ => 1)
      .sumL

    errorCount <- errCounter.get
    _          <- Task(println(f"completed tasks: $tasksFinishedCount, errors: $errorCount"))
  } yield ()

  program.runSyncUnsafe()
}

Как видите, я больше не использую глобальный изменяемый побочные эффекты, но вместо этого Ref, который также использует Atomic, но предоставляет функциональный API, который мы можем использовать с Task. В демонстрационных целях я также изменил порог на 30, и только каждая другая задача будет «ошибкой». Таким образом, ожидаемый результат всегда равен completed tasks: 60, errors: 30 независимо от размера пула потоков.

Я все еще использую опрос с errCounter.get.restartUntil(_ >= threshold), который может потребовать слишком много ресурсов процессора на мой вкус, но он близок к вашему оригинальная идея и работает хорошо.

Обычно я не создаю список задач заранее, а вместо этого добавляю входные данные в Observable и создаю задачи внутри .mapParallelUnordered. Этот код хранит ваш список, поэтому реальное отображение не используется (в нем уже есть задачи).

Вы можете выбрать желаемый параллелизм так же, как с Task.gatherN, что довольно неплохо, imo.

Дайте мне знать, если что-то все еще неясно:)

...