Я пытаюсь создать сервис последовательной загрузки с использованием RxJava.
Пользователь может добавлять элементы в пакетном режиме (20, 30 и т. Д.) Или отдельные элементы. Эти элементы будут добавлены в очередь, а затем загружены последовательно по 10 раз.
Для этого я использую PublishSubject:
PublishSubject<Int> pubSubject = PublishSubject.create();
, который генерирует элемент (идентификаторы), добавленный пользователем, а затем применяет оператор буфера к элементам пакета. Используя эти идентификаторы, элементы загружаются в flatMap и возвращаются в следующем разделе подписки.
pubSubject.buffer(1, TimeUnit.SECONDS, 10)
.observeOn(Schedulers.io())
.flatMap { idsBatch -> downloadByIds(idsBatch) }
.subscribe(
/* onNext */ { apiResponse -> handleResponse() },
/* onError */ { handleError(it) },
/* onComplete*/ { hideProgressBar() }
)
Код в основном работает как положено. Элементы пакетируются и загружаются успешно, но даже после отправки всех элементов, буфер все еще вызывает flatMap с пустым списком, а onComplete () никогда не вызывается.
Я хочу знать, есть ли какой-либо метод или способ в RxJava для получения обратного вызова onComplete, когда в буфере больше нет элементов. Потому что иначе мой сервис загрузки никогда не прекращается.