Как остановить эмиссию из PublishSubject, когда буфер пуст - PullRequest
0 голосов
/ 26 июня 2019

Я пытаюсь создать сервис последовательной загрузки с использованием RxJava. Пользователь может добавлять элементы в пакетном режиме (20, 30 и т. Д.) Или отдельные элементы. Эти элементы будут добавлены в очередь, а затем загружены последовательно по 10 раз. Для этого я использую PublishSubject:

PublishSubject<Int> pubSubject = PublishSubject.create();

, который генерирует элемент (идентификаторы), добавленный пользователем, а затем применяет оператор буфера к элементам пакета. Используя эти идентификаторы, элементы загружаются в flatMap и возвращаются в следующем разделе подписки.

  pubSubject.buffer(1, TimeUnit.SECONDS, 10)
            .observeOn(Schedulers.io())
            .flatMap { idsBatch -> downloadByIds(idsBatch) }
            .subscribe(
                /* onNext */ { apiResponse -> handleResponse() },
                /* onError */ { handleError(it) },
                /* onComplete*/ { hideProgressBar() }
             )

Код в основном работает как положено. Элементы пакетируются и загружаются успешно, но даже после отправки всех элементов, буфер все еще вызывает flatMap с пустым списком, а onComplete () никогда не вызывается.

Я хочу знать, есть ли какой-либо метод или способ в RxJava для получения обратного вызова onComplete, когда в буфере больше нет элементов. Потому что иначе мой сервис загрузки никогда не прекращается.

1 Ответ

1 голос
/ 26 июня 2019

Вы можете использовать операцию takeWhile:

Возвращает Observable, который испускает элементы, испускаемые источником ObservableSource, если каждый элемент удовлетворяет указанномуусловие, а затем завершается, как только это условие не выполняется.

pubSubject.buffer(1, TimeUnit.SECONDS, 10)
          .observeOn(Schedulers.io())
          .takeWhile { idsBatch -> idsBatch.isNotEmpty() }
          .flatMap { idsBatch -> downloadByIds(idsBatch) }
          .subscribe(
              /* onNext */ { apiResponse -> handleResponse() },
              /* onError */ { handleError(it) },
              /* onComplete*/ { hideProgressBar() }
           )
...