Блокировка ввода / вывода в конвейере Project Reactor - PullRequest
1 голос
/ 29 января 2020

В следующем коде Project Reactor используется для распределения блокирующих операций ввода-вывода по ограниченному числу рабочих потоков:

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

...

    List<Item> processItems(List<Item> items) {
        final int parallelDegree = 10;
        final Scheduler scheduler = Schedulers.newParallel("myScheduler", parallelDegree, true);

        return Flux.fromIterable(items)
            .parallel(parallelDegree)
            .runOn(scheduler)
            .map(this::doSomeBlockingIo)
            .sequential()
            .publishOn(Schedulers.immediate())
            .collectList()
            .block();

    ...

    Item doSomeBlockingIo(Item item) {
        // perform some non-deterministic, blocking I/O with side-effects
        ...
        return someNewItem;
    }

Код работает нормально как есть. Но действительно ли он надежен и идиоматичен c?

Примечание. Я проверил и ничего не вижу в документации по Project Reactor (включая JavaDocs), которая явно запрещает этот вариант использования.

Запрашиваемая у друга.

1 Ответ

1 голос
/ 29 января 2020

Это будет работать нормально, это надежно, но тот факт, что вы используете параллельный планировщик для блокировки работы ввода-вывода, не оптимален (и не особенно идиоматичен; когда кто-то из опытных в реакторе видит параллельный планировщик, они ожидают, что он работает без блокировки ввода-вывода.)

Лучшим подходом здесь было бы поменять ваш параллельный планировщик на ограниченный elasti c планировщик с выбранной вами заглушкой (10 в ваш пример) - это раскручивает и повторно использует вспомогательных работников по мере необходимости, вплоть до вашей максимальной.

...