Как дросселировать запросы к холодному потоку вверх по течению - PullRequest
0 голосов
/ 01 августа 2020

У меня есть холодный Flowable, сделанный методом генерации. Я хочу, чтобы он оставался холодным, но также хочу "задушить" запросы потребителя на дополнительные элементы. Например, если потребителю (подписчику?) Требуется 400 мсек для обработки элемента, а регулирование установлено на 1 секунду, я ожидаю увидеть шкалу времени примерно так:

0 - generate callback called to generate next value
1 - consumer starts processing generated value
401 - consumer finished processing value
401 - consumer requests next item
1000 - generate callback called to generate next value

Это пример кода Я играю, чтобы понять это:

    val startTime = System.currentTimeMillis()
    fun log(msg: String) {
        println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
    }

    val generator = Flowable.generate<Int, Int>(
        Callable { 0 },
        BiFunction { state, emitter ->
            val value = state + 1
            log("generating $value")
            emitter.onNext(value)
            return@BiFunction value
        })

    val subscription = generator
        .concatMap { Flowable.concat(Flowable.just(it), Flowable.empty<Int>().delay(1, TimeUnit.SECONDS)) }
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.io(), true, 1)
        .subscribe {
            log("processing $it")
            if (it % 5 == 0) {
                log("starting sleep")
                try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
                log("done sleeping")
            }
        }

    Thread.sleep(8_000)
    subscription.dispose()
    log("done")

Результат, который я получаю сейчас, следующий:

RxComputationThreadPool-1 -  278 - generating 1
RxCachedThreadScheduler-1 -  283 - processing 1
RxComputationThreadPool-1 -  285 - generating 2
RxComputationThreadPool-2 - 1287 - generating 3
RxComputationThreadPool-2 - 1288 - generating 4
RxCachedThreadScheduler-1 - 1288 - processing 2
RxCachedThreadScheduler-1 - 2288 - processing 3
RxComputationThreadPool-4 - 3288 - generating 5
RxComputationThreadPool-4 - 3288 - generating 6
RxCachedThreadScheduler-1 - 3288 - processing 4
RxCachedThreadScheduler-1 - 4289 - processing 5
RxCachedThreadScheduler-1 - 4289 - starting sleep
RxComputationThreadPool-6 - 5290 - generating 7
RxComputationThreadPool-6 - 5290 - generating 8
RxCachedThreadScheduler-1 - 6489 - done sleeping
RxCachedThreadScheduler-1 - 6489 - processing 6
RxCachedThreadScheduler-1 - 7490 - processing 7
main - 8265 - done

Результат, который я хочу, чтобы получить, - это что-то это:

RxComputationThreadPool-1 -  278 - generating 1
RxCachedThreadScheduler-1 -  283 - processing 1
RxComputationThreadPool-1 - 1285 - generating 2 // 2 generated one second after 1
RxCachedThreadScheduler-2 - 1287 - processing 2 // but once generated, processed immediately
RxComputationThreadPool-2 - 2288 - generating 3 // 3 generated one second after 2
RxCachedThreadScheduler-1 - 2288 - processing 3 
RxComputationThreadPool-1 - 3289 - generating 4 // 4 generated one second after 3
RxCachedThreadScheduler-4 - 3289 - processing 4
RxComputationThreadPool-4 - 4290 - generating 5 // 5 generated one second after 4
RxCachedThreadScheduler-1 - 4290 - processing 5
RxCachedThreadScheduler-1 - 4290 - starting sleep // item 5 takes longer to process
RxCachedThreadScheduler-1 - 6491 - done sleeping  // 2200ms later its done
RxComputationThreadPool-6 - 6492 - generating 6 // now that consumer is done, it requests next item and gets generated immediately since it has been 1 second since last request
RxCachedThreadScheduler-1 - 6492 - processing 6
RxComputationThreadPool-1 - 7492 - generating 7 // 7 generated one second after 6
RxCachedThreadScheduler-4 - 7493 - processing 7
main - 8265 - done

1 Ответ

1 голос
/ 02 августа 2020

Один из преобразователей RxJavaExtensions (версия RxJava2) должен быть решением вашей проблемы - spanout(). Он вставляет задержку между выбросами от восходящего потока. Я изменил только одну строку в вашем коде (заменил concatMap() на spanout()):

val startTime = System.currentTimeMillis()
fun log(msg: String) {
    println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
}

val generator = Flowable.generate<Int, Int>(
    Callable { 0 },
    BiFunction { state, emitter ->
        val value = state + 1
        log("generating $value")
        emitter.onNext(value)
        return@BiFunction value
    })

val subscription = generator
    .compose(FlowableTransformers.spanout(1, 1, TimeUnit.SECONDS)) // <– changed line
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io(), true, 1)
    .subscribe {
        log("processing $it")
        if (it % 5 == 0) {
            log("starting sleep")
            try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
            log("done sleeping")
        }
    }

Thread.sleep(8_000)
subscription.dispose()
log("done")

Произведенный вывод:

RxComputationThreadPool-1 -  153 - generating 1
RxCachedThreadScheduler-1 - 1178 - processing 1
RxComputationThreadPool-1 - 1179 - generating 2
RxCachedThreadScheduler-1 - 2176 - processing 2
RxComputationThreadPool-1 - 2177 - generating 3
RxCachedThreadScheduler-1 - 3177 - processing 3
RxComputationThreadPool-1 - 3178 - generating 4
RxCachedThreadScheduler-1 - 4175 - processing 4
RxComputationThreadPool-1 - 4175 - generating 5
RxCachedThreadScheduler-1 - 5177 - processing 5
RxCachedThreadScheduler-1 - 5178 - starting sleep
RxCachedThreadScheduler-1 - 7383 - done sleeping
RxComputationThreadPool-1 - 7384 - generating 6
RxCachedThreadScheduler-1 - 7384 - processing 6
RxComputationThreadPool-1 - 7385 - generating 7
main - 8151 - done
...