Как обрабатывать ошибки и поддерживать Observable в .mapParallelUnordered - PullRequest
1 голос
/ 16 апреля 2019

Я использую Monix 3 и имею вид этого кода:

  Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .flatMap(i =>
      if (i % 2 == 0) {   // Bad i
        Observable.empty
      } else
        Observable.pure(i)
    )
    .foreachL(i => print(s"Good i: $i"))   /*Output: Good i: 1
                                                     Good i: 3
                                                     Good i: 5
                                                     Good i: 7
                                                     Good i: 9*/

Этот код работает хорошо, но у меня много длительных операций ввода-вывода, поэтому я решил провести рефакторинг с .mapParallelUnordered:

  Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .mapParallelOrdered(3)(i =>
      if (i % 2 == 0) {
        Task.raiseError(new Exception(s"Bad i: $i"))
      } else
        Task.pure(i)
    )
    .foreachL(i => print(s"Good i: $i"))    /*Output: Good i: 1*/

Я пытаюсь получить тот же результат, что и в первом примере, но в параллельной обработке.Проблема в том, что Task.raiseError убивает целую наблюдаемую, поэтому она останавливается на i = 2.

Как обрабатывать ошибки и поддерживать наблюдаемую активность?

...