Мой текущий проект Android имеет различные типы справочных данных, хранящихся в базе данных Firebase Realtime.
Приложение Android хранит собственную локальную копию этих справочных данных, которая должна обновляться на основе даты последнего обновления.
Я хочу использовать RxJava для управления параллельным обновлением этих справочных данных.
После успешного обновления всех справочных данных я хочу сбросить дату последнего обновления локально до сегодняшней даты.
Решение, которое у меня есть в настоящее время, не работает должным образом.Я не могу сбросить дату последнего обновления, чтобы дождаться успешного обновления справочных данных.
Мой код в настоящее время похож на это: -
Observable.just(Boolean.TRUE)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.doOnNext(refresh -> referenceTypeA())
.doOnNext(refresh -> referenceTypeB())
.doOnNext(refresh -> referenceTypeC())
.doOnNext(refresh -> recordTodaysDate())
.subscribe();
Каждый из referenceTypeX()
методов имеетта же структура, что и в следующем случае: -
private void referenceTypeX() {
firebaseService
.getReferenceTypeX()
.doAfterSuccess(this::referenceTypeX)
.doOnError(this::processError)
.subscribe();
}
public Single<List<ReferenceTypeX>> getReferenceTypeX() {
return Single.create(emitter ->
mReferenceTypeXDatabaseReference
.orderByChild("typeXKey")
.addValueEventListener(buildTypeXListener(emitter)));
}
В этом и заключается проблема, поскольку обратный вызов Firebase выполняется в главном потоке, а некоторые из справочных данных имеют 1000 строк, и я обрабатываю событие изменения данных на фоне.выполните следующие действия: -
public class TypeXListener extends BaseValueEventListener<List<ReferenceTypeX>> {
private static final TypeXMapper TYPE_X_MAPPER = Selma.builder(TypeXMapper.class).build();
private static final List<ReferenceTypeX> TYPE_X = new ArrayList<>();
public TypeXListener(final SingleEmitter<List<ReferenceTypeX>> emitter) {
super(emitter);
}
@Override
public void onDataChange(final DataSnapshot dataSnapshot) {
final GenericTypeIndicator<TypeXCsv> typeXTypeIndicator = new GenericTypeIndicator<TypeXCsv>() {
};
final Thread thread = new Thread(() -> {
for (final DataSnapshot typeXDataSnapshot : dataSnapshot.getChildren()) {
final TypeXCsv typeXCsv = typeXDataSnapshot.getValue(typeXDataSnapshot);
final TypeX typeX = TypeXMapper.mapTypeX(typeXCsv);
TYPE_X_s.add(typeX);
}
emitter.onSuccess(TYPE_X_s);
});
thread.start();
}
}
Я считаю, что мне нужно создать Observable для каждого из referenceTypeX()
методов и передать их моему Observable.just()
, а затем использовать flatMap () дляраспараллеливать обновления справочных данных.
Как мне этого добиться?
Особенно, когда мне нужен обратный вызов Firebase в реальном времени для запуска в фоновом потоке, а не в основном потоке?
Я считаю, что мое решение будет выглядеть примерно так:
Observable.just(referenceTypeA(), referenceTypeB(), referenceTypeC())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(?????????????)
.doOnComplete(recordTodaysDate())
.subscribe();