Как выполнить многопоточную обработку файлов в Reactor - PullRequest
1 голос
/ 04 ноября 2019

Я пытаюсь обрабатывать несколько файлов параллельно, используя Reactor Flux. Основная рабочая нагрузка происходит при вызове flatMap, после чего поток преобразовывается и фильтруется.

Всякий раз, когда я пытаюсь подписаться на результирующий поток, основной поток завершает работу, прежде чем получит какие-либо значения.

Flux.fromStream(Files.list(Paths.get("directory"))
    .flatMap(path -> { 
        return Flux.create(sink -> {
            try (
                RandomAccessFile file = new RandomAccessFile(new File(path), "r");
                FileChannel fileChannel = file.getChannel()
            ) {
                // Process file into tokens
                sink.next(new Token(".."));
            } catch (IOException e) {
                sink.error(e);
            } finally {
                sink.complete();
            }
        }).subscribeOn(Schedulers.boundedElastic());
    })
    .map(token -> /* Transform tokens */)
    .filter(token -> /* Filter tokens*/)
    .subscribe(token -> /* Store tokens in list */)

Я ожидаю найти выходные данные конвейера обработки в моем списке,но программа сразу выходит. Во-первых, мне интересно, правильно ли я использую класс Flux, а во-вторых, как мне ждать завершения вызова подписки?

1 Ответ

1 голос
/ 04 ноября 2019

Я ожидаю найти выходные данные конвейера обработки в моем списке, но программа немедленно завершает работу.

Код, который у вас есть , устанавливает ваша реактивная цепочка в главном потоке, а затем ничего не делает ... в главном потоке. Таким образом, основной поток завершает свою работу, и поскольку потоки boundedElastic() являются потоками демонов, другие потоки не останавливают программу, поэтому она закрывается.

Вы можете увидеть то же поведение сболее простой пример:

Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
            .delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);

Конечно, вы можете вызвать newBoundedElastic("name", false), чтобы сделать его планировщиком с поддержкой non-daemon , но тогда вам придется отслеживать его и вызывать disposeкогда вы закончите, это действительно просто перевернет проблему (программа будет работать бесконечно, пока вы не утилизируете планировщик).

Быстрое грязное 'n' грязное решение - просто заблокировать последний элемент Fluxкак последняя строка в вашей программе - поэтому, если мы добавим:

f.blockLast();

..., тогда программа ожидает, пока последний элемент не будет выпущен, прежде чем выйти, и у нас будет поведение, которое мы преследуем.

Для простого доказательства концепции это хорошо. Это не идеально в «производственном» коде. Во-первых, «отсутствие блокировки» - это общее правило в реактивном коде, поэтому, если у вас есть такие блокировки вызовов, трудно решить, намечено это или нет. Если вы добавили другие цепочки, а также хотели, чтобы они завершились, вам нужно будет добавить блокирующие вызовы для каждого из них. Это грязно и не совсем устойчиво.

Лучше было бы использовать CountDownLatch:

CountDownLatch cdl = new CountDownLatch(1);

Flux.just(1, 2, 3, 4, 5)
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> cdl.countDown())
        .subscribe(System.out::println);

cdl.await();

Это имеет преимущество, заключающееся в отсутствии явной блокировки, а также способности обрабатыватьболее одного издателя одновременно (если вы устанавливаете начальное значение выше 1.) Это также является подходом, который я считаю рекомендованным в целом для такого рода вещей - так что, если вы хотите наиболее широко принятое решение, это, вероятно, так.

Тем не менее, я предпочитаю Phaser для всех примеров, когда вам нужно подождать нескольких издателей, а не одного - он работает аналогично CountdownLatch, но может динамически register() и deregister(). Это означает, что вы можете создать один фазер, а затем легко зарегистрировать в нем несколько издателей, не требуя изменения исходного значения, например:

Phaser phaser = new Phaser(1);

Flux.just(1, 2, 3, 4, 5)
        .doOnSubscribe(s -> phaser.register())
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> phaser.arriveAndDeregister())
        .subscribe(System.out::println);

Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .doOnSubscribe(s -> phaser.register())
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> phaser.arriveAndDeregister())
        .subscribe(System.out::println);

phaser.arriveAndAwaitAdvance();

(Вы, конечно, можете обернуть логику onSubscribe и doFinally вотдельный метод тоже при необходимости.)

...