RxJava2: выполнить операцию с данными, которые были успешно отправлены в REST API - PullRequest
0 голосов
/ 06 сентября 2018

У меня есть конкретная проблема, для которой я не могу найти чистое решение.

Observable.interval(1, TimeUnit.SECONDS)
    .flatMap { constructData() }
    .subscribe { data ->
        api.syncData(data)
            .repeat()
            .subscribe { response ->
                deleteSyncedData(data)
            }
    }

Итак, как вы можете видеть из кода, мне нужно создать некоторый пакет данных, чтобы отправить его бэкэнду, и после его правильной отправки я удаляю его из своей локальной системы хранения. Прямо сейчас это работает, но выглядит немного как ад обратного вызова - что, если я хотел бы объединить ответ с данными, а затем сделать запрос?

Кто-нибудь видит лучшее решение для такой операции?

Кроме того, я хотел бы знать, как переключаться между планировщиками между этими операциями? Я хочу выполнить constructData() и deleteSyncedData() на Schedulers.computation(), но api.syncData(data) на Schedulers.io().

1 Ответ

0 голосов
/ 06 сентября 2018

Из документа Rx для подписки :

Вы не можете использовать subscribeOn() несколько раз в chain. Технически, вы можете сделать это, но это не будет иметь никакого дополнительного эффекта. В вашем коде, если вы объединяете в цепочку два разных Schedulers с использованием subscribeOn(), то только тот, который закрыт для источника observable, будет действовать и ничего больше.

Но вы можете добиться того же, используя observeOn(). Вы можете переключаться между несколькими Schedulers, используя observeOn() и, наконец, observe результат на MainThread

Пример:

Observable.interval(1, TimeUnit.SECONDS)
    .observeOn(Schedulers.computation())
    .flatMap { constructData() }
    .observeOn(Schedulers.io())
    .flatMap { data -> api.syncData(data).map { data } }
    .observeOn(Schedulers.computation())
    .flatMap { data -> deleteSyncedData(data) }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { data ->

    }

Но, на мой взгляд, это тоже хорошо.

Observable.interval(1, TimeUnit.SECONDS)
    .flatMap { constructData() }
    .subscribe { data ->
        api.syncData(data)
            .repeat()
            .subscribe { response ->
                deleteSyncedData(data)
            }
    }
...