Чтобы ответить на ваш первый вопрос:
В этой реализации элементы обрабатываются в конце (подписка (System.out :: println)), когда все страницы загружены. "
Это неверно. Весь смысл реактивного программирования состоит в том, чтобы избежать этого. fetchItems()
возвращает Flowable<T>
, который фактически не извлекает какие-либо элементы, пока что-то не подпишется на него. Когда вы подписываетесь на что-то, подписчик получает уведомление каждый раз, когда товар готов. Вы должны вызвать subscribe()
и передать функцию, которая будет вызываться каждый раз, когда элемент готов. В моем примере я передаю System.out::println
, который печатает значения, но вы можете реализовать свой собственный обработчик, который сохраняет в базу данных.
Я бы предпочел обрабатывать их (сохранение базы данных) сразу же после их загрузки (в .doOnNext (page -> {})
Это сбивает с толку разницу между издателем и потребителем. Издатель производит предметы - в моем примере это Flowable<T>
, который производит предметы типа T
. Потребитель потребляет товары, которые производит издатель. doOnNext()
является функцией издателя . Там написано: «Когда вы что-то публикуете, тоже делайте этот побочный эффект». В моем примере побочным эффектом является выдача номера следующей страницы для извлечения. Вы не должны обрабатывать сохранение БД там, вы должны написать свою собственную функцию обратного вызова ( Потребитель ) или Подписчик , чтобы обработать ее, и предоставить это для вызова подписки.