RxJava: concatMap () с zip () застревает - PullRequest
0 голосов
/ 29 сентября 2018

У меня есть фиктивный сетевой источник данных:

    fun networkDataSource(): Single<List<Int>> {
        return Single.just((0 until 100).toList())
                .delay(150, TimeUnit.MILLISECONDS)
    }

Здесь можно наблюдать бесконечно.Его основное применение заключается в том, что его вычисления должны быть «защищены», чтобы вычислял свое единственное значение только один раз . (Здесь значение равно 1.)

    val endless = Observable
            .just(1)
            .observeOn(Schedulers.io())
            .delay(500, TimeUnit.MILLISECONDS)
            // Counts as heavy operation, do not calculate this here once again
            .doOnNext { println("=> E: Calculated once") }
            .cache()
            //.doOnNext { println("=> E: From cache") }
            .repeat()

Основной поток просто выводит значения:

    val mainStream = Observable.range(0, 6)
            .doOnNext { println("=> M: Main stream $it") }

Задача:

Скомбинируйте 3 наблюдаемые вместе и оптимизируйте использование сети, чтобы она не вызывалась больше, чем необходимо.(Как только число данных - целых чисел в этом случае - соблюдается.

Подход:

    mainStream
            .concatMap {index ->
                Observables.zip(
                        Observable.just(index),
                        endless,
                        networkDataSource()
                                .toObservable()
                                .doOnNext { println("#> N: Network data fetch $index") }
                )
            }
            .doOnNext { println("=> After concatmap: ${it.first}") }
            .take(4)
            .doOnNext { println("=> After take: ${it.first}") }
            .subscribe(
                    { println("=> Last onnext") },
                    { it.printStackTrace() },
                    { synchronized(check) { check.notifyAll() } }
            )

Завершение заблокированного потока - требуется только для тестирования:

synchronized(check) {
    check.wait()
}
println("Ending")

Вот вывод:

=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext

Это вывод, и он зависает после второго дубля. (Не продолжается в течение минуты). Мой вопрос, почему это происходит?

Как примечание, если я раскомментирую строку из endless наблюдаемой:

.doOnNext { println("=> E: From cache") }

Она зальет консоль этой строкой. Почему endless вызывается так много раз вместо каждогоитерация?

flatMap() не является здесь решением, поскольку она не учитывает take(4) и завершает все сетевые вызовы.

Так как я могу заставить concatMap() работать?

(я также добавил тег RxJS , потому что это реактивная проблема и абсолютно не касается Kotlin. Решения JS также приветствуются, если эти функции существуют в библиотеке RxJava.)

Редактировать.:

Я посмотрел на код, и 2 выхода, вероятно,Только из-за параметра prefetch:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);
}

Но я до сих пор не понимаю, как это работает.Я только что прочитал, что concatMap() - это flatmap(), но он ждет результатов каждого.

1 Ответ

0 голосов
/ 01 октября 2018

Из комментариев:

Вся установка, скорее всего, будет выполняться в том же потоке после первого элемента, а repeat в endless никогда не откажется от потока, не позволяя любому другому оператору продолжить работу.Мне не имеет смысла повторять это cache, потому что вы будете когда-либо использовать только один его единственный элемент.

...