Объедините базу данных и сетевой вызов с RxJava2 - PullRequest
0 голосов
/ 30 августа 2018

У меня есть 2 источника данных: база данных (кеш) и API, и мне нужно объединить их в один поток. Я знаю, что могу просто использовать concatArray или что-то подобное, но я хочу добиться более сложного поведения:

  • Наблюдаемый поток, который будет излучать до 2 элементов.

  • Вначале он подпишется на оба источника.

  • Если вызов API будет достаточно быстрым (<~ 300 мс), он будет излучать только данные из него и завершать поток. </p>

  • Если вызов API будет медленным (> ~ 300 мс), отправьте данные из базы данных и продолжайте ждать данные из API
  • Если вызов API не удастся, отправьте данные из базы данных и выдайте ошибку.
  • Если база данных каким-то образом будет работать медленнее, чем API, она не может выдавать свои данные (завершение потока решает ее)

Я выполнил это с помощью следующего кода:

    public Observable<Entity> getEntity() {
    final CompositeDisposable disposables = new CompositeDisposable();
    return Observable.<Entity>create(emitter -> {
        final Entity[] localEntity = new Entity[1];

        //database call:
        disposables.add(database.getEntity()
                .subscribeOn(schedulers.io())
                .doOnSuccess(entity -> localEntity[0] = entity) //saving our entity because 
                                                        //apiService can emit error before 300 ms 
                .delay(300, MILLISECONDS)
                .subscribe((entity, throwable) -> {
                    if (entity != null && !emitter.isDisposed()) {
                        emitter.onNext(entity);
                    }
                }));

        //network call:
        disposables.add(apiService.getEntity()
                .subscribeOn(schedulers.io())
                .onErrorResumeNext(throwable -> {
                    return Single.<Entity>error(throwable) //we will delay error here
                            .doOnError(throwable1 -> {
                                if (localEntity[0] != null) emitter.onNext(localEntity[0]); //api error, emit localEntity
                            })
                            .delay(200, MILLISECONDS, true); //to let it emit localEntity before emitting error
                })
                .subscribe(entity -> {
                    emitter.onNext(entity); 
                    emitter.onComplete(); //we got entity from api, so we can complete the stream
                }, emitter::onError));
    })
            .doOnDispose(disposables::clear)
            .subscribeOn(schedulers.io());
}

Код немного неуклюжий, и я создаю здесь наблюдаемые внутри наблюдаемой, что, я думаю, плохо . Но таким образом у меня есть глобальный доступ к эмиттеру, который позволяет мне управлять основным потоком (передача данных, успех, ошибка) так, как я хочу.

Есть ли лучший способ добиться этого? Я хотел бы увидеть несколько примеров кода. Спасибо!

Ответы [ 3 ]

0 голосов
/ 31 августа 2018

Другое решение может быть таким (без темы):

public static void main(String[] args) throws InterruptedException {
    Database database = new Database(Single.just(new Entity("D1")));
    ApiService apiService = new ApiService(Single.just(new Entity("A1")));
    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));
    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));

    database.getEntity()
            .toObservable()
            .groupJoin(apiService.getEntity()
                                 .toObservable()
                                 .onErrorResumeNext(
                                    err -> Observable.concatDelayError(database.getEntity().toObservable(),
                                                                       Observable.error(err))),
                       dbDuration -> Observable.timer(300, MILLISECONDS),
                       apiDuration -> Observable.never(),
                       (db, api) -> api.switchIfEmpty(Observable.just(db)))
            .flatMap(o -> o)
            .subscribe(System.out::println,
                       Throwable::printStackTrace,
                       () -> System.out.println("It's the end!"));

    Observable.timer(1, MINUTES) // just for blocking the main thread
              .toBlocking()
              .subscribe();
}

Если в течение 300 мс (dbDuration -> timer(300, MILLISECONDS)) из службы API ничего не отправляется, то сущность из базы данных отправляется (api.switchIfEmpty(db)).

Если api испускает что-то в течение 300 мс, тогда испускается только его Entity (api.switchIfEmpty(.)).

Кажется, это работает, как вы хотите ...

0 голосов
/ 17 октября 2018

Еще более приятное решение:

public static void main(String[] args) throws InterruptedException {
    Database database = new Database(Single.just(new Entity("D1")));
    ApiService apiService = new ApiService(Single.just(new Entity("A1")));
    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));
    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));

    Observable<Entity> apiServiceWithDbAsBackup =
            apiService.getEntity()
                      .toObservable()
                      .onErrorResumeNext(err -> 
                            Observable.concatDelayError(database.getEntity().toObservable(), Observable.error(err)));

    Observable.amb(database.getEntity()
                           .toObservable()
                           .delay(300, MILLISECONDS)
                           .concatWith(apiServiceWithDbAsBackup),
                   apiServiceWithDbAsBackup)
              .subscribe(System.out::println,
                         Throwable::printStackTrace,
                         () -> System.out.println("It's the end!"));

Мы используем amb() с задержкой для того, чтобы база данных наблюдалась, чтобы взять первую, которая будет излучать. В случае ошибки службы API, мы выдаем элемент из базы данных. Кажется, это работает, как вы хотите ...

0 голосов
/ 31 августа 2018

Может быть код ниже может сделать работу. Исходя из ваших требований, я предположил, что API и база данных имеют дело с Single<Entity>.

private static final Object STOP = new Object();

public static void main(String[] args) {
    Database database = new Database(Single.just(new Entity("D1")));
    ApiService apiService = new ApiService(Single.just(new Entity("A1")));
    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(500, MILLISECONDS));
    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
    BehaviorSubject<Object> subject = BehaviorSubject.create();

    Observable.merge(
        apiService.getEntity()
                  .toObservable()
                  .doOnNext(t -> subject.onNext(STOP))
                  .doOnError(e -> subject.onNext(STOP))
                  .onErrorResumeNext(t ->
                                        Observable.concatDelayError(database.getEntity().toObservable(),
                                                                    Observable.error(t))),
        database.getEntity()
                .delay(300, MILLISECONDS)
                .toObservable()
                .takeUntil(subject)
    )
    .subscribe(System.out::println, 
               System.err::println);

    Observable.timer(1, MINUTES) // just for blocking the main thread
              .toBlocking()
              .subscribe();
}

Мне не удалось отменить использование Subject из-за условий «Если база данных будет работать медленнее, чем API, она не сможет выдать свои данные» и «Если вызов API будет медленным (> ~ 300 мс), выдают данные из базы данных и все еще ждут данных из API ". В противном случае, оператор amb() будет хорошим вариантом.

Надеюсь, это поможет.

...