У меня есть фиктивный сетевой источник данных:
fun networkDataSource(): Single<List<Int>> {
return Single.just((0 until 100).toList())
.delay(150, TimeUnit.MILLISECONDS)
}
Здесь можно наблюдать бесконечно.Его основное применение заключается в том, что его вычисления должны быть «защищены», чтобы вычислял свое единственное значение только один раз . (Здесь значение равно 1.)
val endless = Observable
.just(1)
.observeOn(Schedulers.io())
.delay(500, TimeUnit.MILLISECONDS)
// Counts as heavy operation, do not calculate this here once again
.doOnNext { println("=> E: Calculated once") }
.cache()
//.doOnNext { println("=> E: From cache") }
.repeat()
Основной поток просто выводит значения:
val mainStream = Observable.range(0, 6)
.doOnNext { println("=> M: Main stream $it") }
Задача:
Скомбинируйте 3 наблюдаемые вместе и оптимизируйте использование сети, чтобы она не вызывалась больше, чем необходимо.(Как только число данных - целых чисел в этом случае - соблюдается.
Подход:
mainStream
.concatMap {index ->
Observables.zip(
Observable.just(index),
endless,
networkDataSource()
.toObservable()
.doOnNext { println("#> N: Network data fetch $index") }
)
}
.doOnNext { println("=> After concatmap: ${it.first}") }
.take(4)
.doOnNext { println("=> After take: ${it.first}") }
.subscribe(
{ println("=> Last onnext") },
{ it.printStackTrace() },
{ synchronized(check) { check.notifyAll() } }
)
Завершение заблокированного потока - требуется только для тестирования:
synchronized(check) {
check.wait()
}
println("Ending")
Вот вывод:
=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext
Это вывод, и он зависает после второго дубля. (Не продолжается в течение минуты). Мой вопрос, почему это происходит?
Как примечание, если я раскомментирую строку из endless
наблюдаемой:
.doOnNext { println("=> E: From cache") }
Она зальет консоль этой строкой. Почему endless
вызывается так много раз вместо каждогоитерация?
flatMap()
не является здесь решением, поскольку она не учитывает take(4)
и завершает все сетевые вызовы.
Так как я могу заставить concatMap()
работать?
(я также добавил тег RxJS , потому что это реактивная проблема и абсолютно не касается Kotlin. Решения JS также приветствуются, если эти функции существуют в библиотеке RxJava.)
Редактировать.:
Я посмотрел на код, и 2 выхода, вероятно,Только из-за параметра prefetch
:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMap(mapper, 2);
}
Но я до сих пор не понимаю, как это работает.Я только что прочитал, что concatMap()
- это flatmap()
, но он ждет результатов каждого.