Как запланировать несколько потоков параллельно и ограничить события onNext и onRequest - PullRequest
0 голосов
/ 17 апреля 2020

Я хочу объединить несколько Flux одного типа вместе. Во время подписки они должны выполняться параллельно. Метод подписки должен быть в состоянии ограничить количество запросов.

Я экспериментировал с Flux.merge(..) и Flux.concat(..). Похоже, что последний обеспечивает принудительное создание последовательного запроса даже при публикации на параллельном планировщике, когда первый запрашивает все элементы с нетерпением и учитывает только метод take(n) для событий onNext, но не для событий onRequest.

Я подготовил минимальный пример. Три Flux, которые должны выполнять тяжелую работу ввода-вывода на Schedulers.boundedElastic(). Каждый регистрирует свой onRequest Событие.

Flux<String> f1 = Flux.just("1").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 1));
Flux<String> f2 = Flux.just("2").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 2));
Flux<String> f3 = Flux.just("3").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 3));

Подход с помощью Flux.concat и take(2)

Flux.concat(f1, f2, f3).take(2)
        .doOnNext(i -> LOGGER.info("onNext {}", i))
        .subscribeOn(Schedulers.boundedElastic()).subscribe();

Результат:

00:16:11.980   [oundedElastic-1] d.f.B.Application                        : onRequest 1
00:16:13.981   [oundedElastic-2] d.f.B.Application                        : onNext 1
00:16:13.981   [oundedElastic-2] d.f.B.Application                        : onRequest 2
00:16:15.982   [oundedElastic-3] d.f.B.Application                        : onNext 2

Два onRequest и два onNext События распространяются. Запросы задерживаются на две секунды , но там, где ожидается, параллельно .

Подход с Flux.map и take(2)

Flux.merge(f1, f2, f3).take(2)
        .doOnNext(i -> LOGGER.info("onNext {}", i))
        .subscribeOn(Schedulers.boundedElastic()).subscribe();

Результат

00:21:58.301   [oundedElastic-1] d.f.B.Application                        : onRequest 1
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 2
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 3
00:22:00.303   [oundedElastic-4] d.f.B.Application                        : onNext 3
00:22:00.304   [oundedElastic-3] d.f.B.Application                        : onNext 2

Три onRequest события генерируются, хотя ожидается только два события . Метод take(n) только ограничил испускаемые события onNext. Выполнение было выполнено параллельно, как и ожидалось.

Ожидаемый результат для неизвестного подхода

00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 2
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 3
00:22:00.303   [oundedElastic-4] d.f.B.Application                        : onNext 3
00:22:00.304   [oundedElastic-3] d.f.B.Application                        : onNext 2

Только два onRequest события отправляются и завершаются

Вопрос

Можно ли объединить эти три Flux таким образом, чтобы они могли быть вычислены одновременно, но ограничены (take(n)?) В их поведении запроса? В моем реальном сценарии запрос приводит к тому, что WebClient вызывает WebService, что запускает трудоемкую задачу ввода-вывода. Поэтому, хотя мое приложение только получает два события onNext, третье отправленное onRequest приводит к запросу, который переходит в нирвану.

1 Ответ

0 голосов
/ 17 апреля 2020

Я также новичок в проекте-реакторе, но я нашел этот обходной путь, который некоторые могли бы назвать хаком:

Flux.fromIterable(Lists.partition(Arrays.asList(f1, f2, f3), 2))
        .take(1)
        .flatMap(partition -> Flux.merge(partition)
                .doOnNext(i -> LOGGER.info("onNext {}", i)))
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();

Он распечатал это для меня:

2020-04-17 01:07:37.658  INFO 59488 --- [oundedElastic-1] com.example.demo.SomeService1            : onRequest 1
2020-04-17 01:07:37.659  INFO 59488 --- [oundedElastic-1] com.example.demo.SomeService1            : onRequest 2
2020-04-17 01:07:39.664  INFO 59488 --- [oundedElastic-3] com.example.demo.SomeService1            : onNext 2
2020-04-17 01:07:39.667  INFO 59488 --- [oundedElastic-2] com.example.demo.SomeService1            : onNext 1
...