Rx Java: объединение Observable с Completable не работает - PullRequest
2 голосов
/ 06 февраля 2020

У меня есть Observable, который в какой-то момент должен записывать вещи в кеш - и мы хотели бы подождать, пока записи будут выполнены, прежде чем завершить всю операцию с наблюдаемой (для целей отчетности).

Для Цель теста: запись в кеш Completable выглядит следующим образом:

   Completable.create(
                  emitter ->
                      new Thread(
                              () -> {
                                try {
                                  Thread.sleep(2000);
                                  doSomething();
                                  emitter.onComplete();
                                } catch (InterruptedException e) {
                                  e.printStackTrace();
                                }
                              })
                          .start());

Поскольку у меня есть несколько записей в кэш, я пытаюсь объединить их в класс контейнера:

public class CacheInsertionResultsTracker {

  private Completable cacheInsertResultsCompletable;

  public CacheInsertionResultsTracker() {
    this.cacheInsertResultsCompletable = Completable.complete();
  }

  public synchronized void add(Completable cacheInsertResult) {
    this.cacheInsertResultsCompletable = this.cacheInsertResultsCompletable.mergeWith(cacheInsertResult);
  }

  public Completable getCompletable() {
    return this.cacheInsertResultsCompletable;
  }
}

И я попробуйте объединить его с Observable следующим образом:

CacheInsertionResultsTracker tracker = new ...;
    observable
        .doOnNext(next->tracker.add(next.writeToCache(...)))
        .mergeWith(Completable.defer(()->tracker.getCompletable()))
        .subscribe(
            // on next
            this::logNextElement
            // on error
            this::finishWithError
            // on complete
            this::finishWithSuccess
            );

Как я могу убедиться, что к моменту завершения finishWithSuccess doSomething завершен? Проблема в том, что ссылка Completable обновляется каждый раз, когда я добавляю новую, и это происходит после запуска mergeWith ...

Ответы [ 2 ]

1 голос
/ 06 февраля 2020

Решение, которое, по-видимому, работает для нашего варианта использования, заключается в использовании concatWith + defer:

observable
    .doOnNext(next->tracker.add(next.writeToCache(...)))
    .concatWith(Completable.defer(()->tracker.getCompletable()))
    .subscribe(
        // on next
        this::logNextElement
        // on error
        this::finishWithError
        // on complete
        this::finishWithSuccess
        );

Concat гарантирует, что подписка на Completable происходит только после завершения Observable, и откладывает откладывание получения окончательно Завершается до этой подписки (поэтому все объекты уже добавлены в трекер).

0 голосов
/ 06 февраля 2020

Исходя из комментариев, вы можете заменить завершаемый кеш на ReplaySubject<Completable>, сделать некоторый таймаут для обнаружения неактивности и получить наблюдаемый конец последовательности.

 ReplaySubject<Completable> cache = ReplaySubject.create();

 cache.onNext(completable);

 observable.mergeWith(
     cache.flatMapCompletable(v -> v)
          .timeout(10, TimeUnit.MILLISECONDS, Completable.complete())
 )

Редактировать:

Ваш обновленный пример подразумевает, что вы хотите запустить Completable s в ответ на элементы в главном observable, изолированные на эту последовательность, и ждать завершения всех из них. Это типичный вариант использования для flatMap:

  observable.flatMap(
       next -> next.writeToCache(...).andThen(Observable.just(next))
  )
  .subscribe(
        this::logNextElement
        // on error
        this::finishWithError
        // on complete
        this::finishWithSuccess
   );
...