Как обработать все элементы списка, а затем дополнить их RxJava - PullRequest
0 голосов
/ 20 декабря 2018

Я изучаю использование RxJava в моем текущем приложении для Android.

Я застрял в следующем сценарии использования.

Для каждой строки данных в конкретной таблице базы данных, которую я хочу выполнитьвызов HTTP POST, когда все POST завершены, OK, мне нужно очистить таблицу базы данных.

У меня следующий код: -

login()
                .andThen(Single.defer(() -> DatabaseController.fetchSingleRealmObjects(UpdateDO.class)))
                .toObservable()
                .flatMapIterable(update -> update)
                .flatMap(this::parameteriseUpdate)
                .doOnNext(NetworkController::update)
                .doOnComplete(() -> DatabaseController.deleteAll(UpdateDO.class))
                .ignoreElements()
                .retryWhen(errors -> errors.flatMap(e -> constructRetryHandler(retryCounter, e)))
                .doOnComplete(onComplete)
                .doOnError(onError)
                .doAfterTerminate(doAfterTerminate())
                .doOnSubscribe(compositeDisposable::add)
                .blockingAwait();

КогдаТаблица UpdateDO пуста, приведенный выше код завершается, как и ожидалось.

Однако, когда существуют строки данных, процесс "залипает" в doOnNext()

Я понимаю, что это потому, что я вызываю только emitter.onNext()

private ObservableSource<Map<String, Object>> parameteriseUpdate(final UpdateDO updateDO) {
        final Map<String, Object> fields = new HashMap<>();
        fields.put(FIELD_NAME_DRUG_ID, updateDO.getDrugId());
        fields.put(FIELD_NAME_STORE_CONTENT_ID, updateDO.getStoreContentId());
        fields.put(FIELD_NAME_STORE_ID, updateDO.getStoreID());
        fields.put(FIELD_NAME_ACTUAL_QUANTITY, updateDO.getActualQty());
        fields.put(FIELD_NAME_VARIANCE, updateDO.getUnitQty());
        fields.put(FIELD_NAME_REMARKS, updateDO.getRemarks());
        fields.put(FIELD_NAME_CREATED_BY, updateDO.getCreatedBy());

        return Observable.create(emitter -> emitter.onNext(fields));
    }

Я не могу понять, как это исправить, как мне сделать рефакторинг моего кода, чтобы позволить мне вызывать emitter.onComplete()?

1 Ответ

0 голосов
/ 20 декабря 2018

Я думаю, что лучше изменить эту функцию с возврата ObservableSource<Map<String, Object>> на простую функцию, подобную этой

private Map<String, Object> parameteriseUpdate(final UpdateDO updateDO) {
    final Map<String, Object> fields = new HashMap<>();
    fields.put(FIELD_NAME_DRUG_ID, updateDO.getDrugId());
    fields.put(FIELD_NAME_STORE_CONTENT_ID, updateDO.getStoreContentId());
    fields.put(FIELD_NAME_STORE_ID, updateDO.getStoreID());
    fields.put(FIELD_NAME_ACTUAL_QUANTITY, updateDO.getActualQty());
    fields.put(FIELD_NAME_VARIANCE, updateDO.getUnitQty());
    fields.put(FIELD_NAME_REMARKS, updateDO.getRemarks());
    fields.put(FIELD_NAME_CREATED_BY, updateDO.getCreatedBy());

    return fields;
}

, и вызывать ее как карту, а не плоскую карту, например:

.map(this::parameteriseUpdate)

потому что в вашем случае вы создаете много потоков, которые никогда не завершаются.

...