Как объединить обратные вызовы базы данных Firebase Realtime с RxJava2 - PullRequest
0 голосов
/ 24 мая 2018

Мой текущий проект Android имеет различные типы справочных данных, хранящихся в базе данных Firebase Realtime.

Приложение Android хранит собственную локальную копию этих справочных данных, которая должна обновляться на основе даты последнего обновления.

Я хочу использовать RxJava для управления параллельным обновлением этих справочных данных.

После успешного обновления всех справочных данных я хочу сбросить дату последнего обновления локально до сегодняшней даты.

Решение, которое у меня есть в настоящее время, не работает должным образом.Я не могу сбросить дату последнего обновления, чтобы дождаться успешного обновления справочных данных.

Мой код в настоящее время похож на это: -

    Observable.just(Boolean.TRUE)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .doOnNext(refresh -> referenceTypeA())
            .doOnNext(refresh -> referenceTypeB())
            .doOnNext(refresh -> referenceTypeC())
            .doOnNext(refresh -> recordTodaysDate())
            .subscribe();

Каждый из referenceTypeX() методов имеетта же структура, что и в следующем случае: -

private void referenceTypeX() {

        firebaseService
                .getReferenceTypeX()
                .doAfterSuccess(this::referenceTypeX)
                .doOnError(this::processError)
                .subscribe();
    }


public Single<List<ReferenceTypeX>> getReferenceTypeX() {
    return Single.create(emitter ->
            mReferenceTypeXDatabaseReference
                    .orderByChild("typeXKey")
                    .addValueEventListener(buildTypeXListener(emitter)));
}

В этом и заключается проблема, поскольку обратный вызов Firebase выполняется в главном потоке, а некоторые из справочных данных имеют 1000 строк, и я обрабатываю событие изменения данных на фоне.выполните следующие действия: -

public class TypeXListener extends BaseValueEventListener<List<ReferenceTypeX>> {
    private static final TypeXMapper TYPE_X_MAPPER = Selma.builder(TypeXMapper.class).build();
    private static final List<ReferenceTypeX> TYPE_X = new ArrayList<>();

    public TypeXListener(final SingleEmitter<List<ReferenceTypeX>> emitter) {
        super(emitter);
    }

    @Override
    public void onDataChange(final DataSnapshot dataSnapshot) {
        final GenericTypeIndicator<TypeXCsv> typeXTypeIndicator = new GenericTypeIndicator<TypeXCsv>() {
        };

        final Thread thread = new Thread(() -> {
            for (final DataSnapshot typeXDataSnapshot : dataSnapshot.getChildren()) {
                final TypeXCsv typeXCsv = typeXDataSnapshot.getValue(typeXDataSnapshot);
                final TypeX typeX = TypeXMapper.mapTypeX(typeXCsv);

                TYPE_X_s.add(typeX);
            }

            emitter.onSuccess(TYPE_X_s);
        });

        thread.start();

    }
}

Я считаю, что мне нужно создать Observable для каждого из referenceTypeX() методов и передать их моему Observable.just()

, а затем использовать flatMap () дляраспараллеливать обновления справочных данных.

Как мне этого добиться?

Особенно, когда мне нужен обратный вызов Firebase в реальном времени для запуска в фоновом потоке, а не в основном потоке?

Я считаю, что мое решение будет выглядеть примерно так:

Observable.just(referenceTypeA(), referenceTypeB(), referenceTypeC())
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .flatMap(?????????????)
                .doOnComplete(recordTodaysDate())
                .subscribe();

1 Ответ

0 голосов
/ 28 мая 2018

Вам нужно изменить функции referenceTypeX(), чтобы они не подписывались на функцию:

private Single<List<ReferenceTypeX>> referenceTypeX() {
    return firebaseService
            .getReferenceTypeX()
            .doAfterSuccess(this::referenceTypeX)
            .doOnError(this::processError)
            .subscribe(); // <-- remove this, just return
}

Затем вы можете построить основную наблюдаемую комбинацию следующих функций:

Single
    .zip(
        referenceTypeA().subscribeOn(Schedulers.io()),
        referenceTypeB().subscribeOn(Schedulers.io()),
        referenceTypeC().subscribeOn(Schedulers.io()),
        Function3<A,B,C,Result> { combineIntoResult(...) })
    .flatMap(???) // is this necessary?
    .map(recordTodaysDate())
    .subscribe();
...