Ошибка дескриптора задачи Monix для списка фьючерсов - PullRequest
0 голосов
/ 20 октября 2019

Как я могу обработать сбой во время асинхронного выполнения задачи? Т.е. хотя бы распечатать стек трассировки и выключить. Код ниже, кажется, ждет вечно ввода> 5

val things = Range(1, 40)
  implicit val scheduler = monix.execution.Scheduler.global
  def t(i:Int) = Task.eval {
      Try{
        Thread.sleep(1000)
        val result = i + 1
        if(result > 5){
          throw new Exception("asdf")
        }
        // i.e. write to file, that's why unit is returned
        println(result) // Effect
        "Result"
      }
    }
    val futures = things.map(e=> t(e))
  futures.foreach(_.runToFuture)

edit

попытка:

futures.foreach(_.runToFuture.onComplete {
    case Success(value) =>
      println(value)
    case Failure(ex) =>
      System.err.println(ex)
      System.exit(1)
  })

не остановит вычисление. Как я могу записать трассировку стека и отменить текущие вычисления и остановить?

1 Ответ

0 голосов
/ 22 октября 2019

Эта проблема состоит из 2 частей:

  • Обеспечение отмены задач.
  • Отмена братьев и сестер при сбое одной задачи.

Выполнение отмены задачи

Monix имеет BooleanCancelable , что позволит вам установить результат isCancelled в true при вызове cancel.

cancel также необходимопозвонить Thread.interrupt, чтобы разбудить его, когда Thread.sleep работает. В противном случае sleep пробежит свой курс. Однако это вызовет InterruptedException в вашей задаче. Это должно быть обработано.

Отмена родных братьев

Существует CompositeCancelable . Это похоже на сценарий использования CompositeCancellable для вызова cancel из родительской задачи. Таким образом, после создания CompositeCancellable (т. Е. Все задачи создаются):

  • Любая ссылка на это должна быть доступна для каждой задачи, чтобы сбойная задача могла вызвать для нее cancel,(Обратите внимание, что это своего рода циклическая ссылка, лучше ее избегать)
  • Или родительская задача (или код) получает уведомление о сбое подзадачи и вызывает cancel. (Это позволит избежать циклической ссылки)

Еще один способ уведомить родственные задачи состоит в том, чтобы использовать AtomicBoolean и часто проверять его (спящий режим 10 миллисекунд вместо 1000). При сбое одной задачи она устанавливает логическое значение, чтобы другие задачи могли остановить их выполнение. Это, конечно, не включает Cancellable. (И это своего рода хак, лучше использовать monix планировщик)

Примечание

Это хорошая идея для вызова Thread.sleep в Task? Я думаю, что это помешало бы другой задаче использовать этот поток. Я думаю, что использование планировщика для добавления задержек и составление этих подзадач является способом наиболее эффективного использования пула потоков.

...