Я хочу объединить несколько Flux
одного типа вместе. Во время подписки они должны выполняться параллельно. Метод подписки должен быть в состоянии ограничить количество запросов.
Я экспериментировал с Flux.merge(..)
и Flux.concat(..)
. Похоже, что последний обеспечивает принудительное создание последовательного запроса даже при публикации на параллельном планировщике, когда первый запрашивает все элементы с нетерпением и учитывает только метод take(n)
для событий onNext
, но не для событий onRequest
.
Я подготовил минимальный пример. Три Flux
, которые должны выполнять тяжелую работу ввода-вывода на Schedulers.boundedElastic()
. Каждый регистрирует свой onRequest
Событие.
Flux<String> f1 = Flux.just("1").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 1));
Flux<String> f2 = Flux.just("2").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 2));
Flux<String> f3 = Flux.just("3").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 3));
Подход с помощью Flux.concat
и take(2)
Flux.concat(f1, f2, f3).take(2)
.doOnNext(i -> LOGGER.info("onNext {}", i))
.subscribeOn(Schedulers.boundedElastic()).subscribe();
Результат:
00:16:11.980 [oundedElastic-1] d.f.B.Application : onRequest 1
00:16:13.981 [oundedElastic-2] d.f.B.Application : onNext 1
00:16:13.981 [oundedElastic-2] d.f.B.Application : onRequest 2
00:16:15.982 [oundedElastic-3] d.f.B.Application : onNext 2
Два onRequest
и два onNext
События распространяются. Запросы задерживаются на две секунды , но там, где ожидается, параллельно .
Подход с Flux.map
и take(2)
Flux.merge(f1, f2, f3).take(2)
.doOnNext(i -> LOGGER.info("onNext {}", i))
.subscribeOn(Schedulers.boundedElastic()).subscribe();
Результат
00:21:58.301 [oundedElastic-1] d.f.B.Application : onRequest 1
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 2
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 3
00:22:00.303 [oundedElastic-4] d.f.B.Application : onNext 3
00:22:00.304 [oundedElastic-3] d.f.B.Application : onNext 2
Три onRequest
события генерируются, хотя ожидается только два события . Метод take(n)
только ограничил испускаемые события onNext
. Выполнение было выполнено параллельно, как и ожидалось.
Ожидаемый результат для неизвестного подхода
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 2
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 3
00:22:00.303 [oundedElastic-4] d.f.B.Application : onNext 3
00:22:00.304 [oundedElastic-3] d.f.B.Application : onNext 2
Только два onRequest
события отправляются и завершаются
Вопрос
Можно ли объединить эти три Flux
таким образом, чтобы они могли быть вычислены одновременно, но ограничены (take(n)
?) В их поведении запроса? В моем реальном сценарии запрос приводит к тому, что WebClient вызывает WebService, что запускает трудоемкую задачу ввода-вывода. Поэтому, хотя мое приложение только получает два события onNext, третье отправленное onRequest приводит к запросу, который переходит в нирвану.