Как ограничить Single.Zip паралеллизм? - PullRequest
0 голосов
/ 22 мая 2019

Я выполняю несколько http-запросов, ожидаю завершения всех запросов и получаю информацию из всех запросов (и нескольких других источников).

В настоящее время я делаю это так:

Single.zip(observables, { array -> array })

Где наблюдаемые - это просто массив наблюдаемых, каждая из которых выполняет асинхронную операцию.

Но у меня есть ограничение на количество операций, которые я могу выполнять одновременно. Одновременно должно быть не более n операций. (n в идеале 5, но 1 тоже принимается)

К сожалению, Zip, похоже, запускает все операции, не дожидаясь их завершения. Есть ли способ ограничить это поведение?

1 Ответ

0 голосов
/ 23 мая 2019

Может быть, вы могли бы использовать комбинацию оператора window() и zip()?Примерно так:

public static void main(String[] args) {
    Flowable<Integer>[] flowables = new Flowable[] {
            Flowable.just(1), Flowable.just(2), Flowable.just(3), Flowable.just(4), Flowable.just(5),
            Flowable.just(6), Flowable.just(7), Flowable.just(8), Flowable.just(9)
    };

    Flowable.fromArray(flowables)
            .window(5)
            .flatMap(f -> Flowable.zip(f, objects -> Arrays.stream(objects).map(Object::toString).collect(joining("")))
                                  .flatMapSingle(Single::just))
            .subscribe(s -> System.out.println("received: " + s));


    Flowable.timer(10, SECONDS) // Just to block the main thread for a while
            .blockingSubscribe();
}

window() разделит текучие элементы на Flowable из Flowable с.Каждый текучий излучает только 5 элементов (это может быть количество операций, которые вы хотите).В этом примере zip() просто объединяет данные целые числа.Будет напечатано:

received: 12345
received: 6789

Надеюсь, это поможет.

...