Я изучаю использование RxJava в моем текущем приложении для Android.
Я застрял в следующем сценарии использования.
Для каждой строки данных в конкретной таблице базы данных, которую я хочу выполнитьвызов HTTP POST, когда все POST завершены, OK, мне нужно очистить таблицу базы данных.
У меня следующий код: -
login()
.andThen(Single.defer(() -> DatabaseController.fetchSingleRealmObjects(UpdateDO.class)))
.toObservable()
.flatMapIterable(update -> update)
.flatMap(this::parameteriseUpdate)
.doOnNext(NetworkController::update)
.doOnComplete(() -> DatabaseController.deleteAll(UpdateDO.class))
.ignoreElements()
.retryWhen(errors -> errors.flatMap(e -> constructRetryHandler(retryCounter, e)))
.doOnComplete(onComplete)
.doOnError(onError)
.doAfterTerminate(doAfterTerminate())
.doOnSubscribe(compositeDisposable::add)
.blockingAwait();
КогдаТаблица UpdateDO пуста, приведенный выше код завершается, как и ожидалось.
Однако, когда существуют строки данных, процесс "залипает" в doOnNext()
Я понимаю, что это потому, что я вызываю только emitter.onNext()
private ObservableSource<Map<String, Object>> parameteriseUpdate(final UpdateDO updateDO) {
final Map<String, Object> fields = new HashMap<>();
fields.put(FIELD_NAME_DRUG_ID, updateDO.getDrugId());
fields.put(FIELD_NAME_STORE_CONTENT_ID, updateDO.getStoreContentId());
fields.put(FIELD_NAME_STORE_ID, updateDO.getStoreID());
fields.put(FIELD_NAME_ACTUAL_QUANTITY, updateDO.getActualQty());
fields.put(FIELD_NAME_VARIANCE, updateDO.getUnitQty());
fields.put(FIELD_NAME_REMARKS, updateDO.getRemarks());
fields.put(FIELD_NAME_CREATED_BY, updateDO.getCreatedBy());
return Observable.create(emitter -> emitter.onNext(fields));
}
Я не могу понять, как это исправить, как мне сделать рефакторинг моего кода, чтобы позволить мне вызывать emitter.onComplete()
?