Несколько потоков RxJava с .subscribe vs .concatmap - PullRequest
0 голосов
/ 10 декабря 2018

Я пытаюсь отправить массив запросов в конечную точку с Observable.fromIterable(array).subscribe() с намерением, чтобы каждый элемент последовательно попадал на сервер.

В настоящее время Observable не ожидает завершения асинхронной задачи отправки в конечную точку, прежде чем выполнить следующий запрос.У меня есть две потенциальные идеи, но я не уверен, как их реализовать.

  1. Переместить .subscribe().onNext() к обратному вызову, чтобы каждый элемент вызывался после завершения асинхронной задачи.

  2. Используйте взамен .concatmap.Я видел, что это опубликовано как рекомендуемый метод для людей с такими же действиями, как у меня, с некоторыми массивами элементов с асинхронными задачами, но я не уверен, почему это будет работать или как это работает иначе, чем у меня выше.

1 Ответ

0 голосов
/ 10 декабря 2018

Оператор fromIterable() испускает каждый элемент, который может.В нисходящем направлении приходится управлять ожиданием асинхронных операций с помощью одного из операторов map().

Observable.fromIterable(array)
  .flatMap( item -> 
             Observable.fromCallable( asyncOp( item ) ), // 1
             1) // 2
  .subscribe(); // 3
  1. fromCallable() превращает асинхронную операцию в наблюдаемую.
  2. Число 1 - это «максимальный параллелизм», наложенный flatMap.Это означает, что flatMap() будет подписываться только на одну тему и будет ждать результата перед обработкой следующей.
  3. Ваш оператор subscribe() всегда должен иметь обработчик onError().В противном случае вышестоящие ошибки могут привести к сбою потока.
...