У меня есть Observable, который в какой-то момент должен записывать вещи в кеш - и мы хотели бы подождать, пока записи будут выполнены, прежде чем завершить всю операцию с наблюдаемой (для целей отчетности).
Для Цель теста: запись в кеш Completable выглядит следующим образом:
Completable.create(
emitter ->
new Thread(
() -> {
try {
Thread.sleep(2000);
doSomething();
emitter.onComplete();
} catch (InterruptedException e) {
e.printStackTrace();
}
})
.start());
Поскольку у меня есть несколько записей в кэш, я пытаюсь объединить их в класс контейнера:
public class CacheInsertionResultsTracker {
private Completable cacheInsertResultsCompletable;
public CacheInsertionResultsTracker() {
this.cacheInsertResultsCompletable = Completable.complete();
}
public synchronized void add(Completable cacheInsertResult) {
this.cacheInsertResultsCompletable = this.cacheInsertResultsCompletable.mergeWith(cacheInsertResult);
}
public Completable getCompletable() {
return this.cacheInsertResultsCompletable;
}
}
И я попробуйте объединить его с Observable следующим образом:
CacheInsertionResultsTracker tracker = new ...;
observable
.doOnNext(next->tracker.add(next.writeToCache(...)))
.mergeWith(Completable.defer(()->tracker.getCompletable()))
.subscribe(
// on next
this::logNextElement
// on error
this::finishWithError
// on complete
this::finishWithSuccess
);
Как я могу убедиться, что к моменту завершения finishWithSuccess doSomething завершен? Проблема в том, что ссылка Completable обновляется каждый раз, когда я добавляю новую, и это происходит после запуска mergeWith ...