У меня есть 2 источника данных: база данных (кеш) и API, и мне нужно объединить их в один поток. Я знаю, что могу просто использовать concatArray или что-то подобное, но я хочу добиться более сложного поведения:
Наблюдаемый поток, который будет излучать до 2 элементов.
Вначале он подпишется на оба источника.
Если вызов API будет достаточно быстрым (<~ 300 мс), он будет излучать только данные из него и завершать поток. </p>
- Если вызов API будет медленным (> ~ 300 мс), отправьте данные из базы данных и продолжайте ждать данные из API
- Если вызов API не удастся, отправьте данные из базы данных и выдайте ошибку.
- Если база данных каким-то образом будет работать медленнее, чем API, она не может выдавать свои данные (завершение потока решает ее)
Я выполнил это с помощью следующего кода:
public Observable<Entity> getEntity() {
final CompositeDisposable disposables = new CompositeDisposable();
return Observable.<Entity>create(emitter -> {
final Entity[] localEntity = new Entity[1];
//database call:
disposables.add(database.getEntity()
.subscribeOn(schedulers.io())
.doOnSuccess(entity -> localEntity[0] = entity) //saving our entity because
//apiService can emit error before 300 ms
.delay(300, MILLISECONDS)
.subscribe((entity, throwable) -> {
if (entity != null && !emitter.isDisposed()) {
emitter.onNext(entity);
}
}));
//network call:
disposables.add(apiService.getEntity()
.subscribeOn(schedulers.io())
.onErrorResumeNext(throwable -> {
return Single.<Entity>error(throwable) //we will delay error here
.doOnError(throwable1 -> {
if (localEntity[0] != null) emitter.onNext(localEntity[0]); //api error, emit localEntity
})
.delay(200, MILLISECONDS, true); //to let it emit localEntity before emitting error
})
.subscribe(entity -> {
emitter.onNext(entity);
emitter.onComplete(); //we got entity from api, so we can complete the stream
}, emitter::onError));
})
.doOnDispose(disposables::clear)
.subscribeOn(schedulers.io());
}
Код немного неуклюжий, и я создаю здесь наблюдаемые внутри наблюдаемой, что, я думаю, плохо . Но таким образом у меня есть глобальный доступ к эмиттеру, который позволяет мне управлять основным потоком (передача данных, успех, ошибка) так, как я хочу.
Есть ли лучший способ добиться этого? Я хотел бы увидеть несколько примеров кода. Спасибо!