Ожидание завершения ParallelFlux - PullRequest
2 голосов
/ 24 марта 2020

Я создал ParallelFlux, а затем использовал .sequential(), ожидая, что в этот момент я смогу подсчитать или «уменьшить» результаты параллельных вычислений. Кажется, проблема в том, что параллельные потоки запускаются, и их ничего не ждет.

У меня есть что-то, что нужно работать, используя CountDownLatch, но я не думаю, что мне нужно это делать.

TL; DR - Я не могу получить результат для распечатки для этого кода:

    Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .subscribe(System.out::println);

1 Ответ

3 голосов
/ 25 марта 2020

При выполнении этого кода в основном, это хорошая демонстрация асинхронной природы вещей. Операция выполняется в потоке из планировщика elasti c, и подписка запускает асинхронную обработку, поэтому она немедленно возвращается.

Существует два способа синхронизации конца приложения с концом Flux :

напечатайте в doOnNext и используйте blockLast()

Блочные * методы обычно используются в основном и тестах, где нет другого выбора, кроме как вернуться назад к блокирующей модели (ie, потому что в противном случае тест / main завершился бы до окончания обработки).

Мы переключаем «побочный эффект» печати каждого испускаемого элемента в doOnNext, что посвященный такого рода случаях использования. Затем мы блокируем до завершения потока, печатая blockLast().

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doOnNext(System.out::println)
    .blockLast();

в subscribe и используем CountDownLatch в doFinally

Это немного более замысловато, но позволяет вам на самом деле использовать подписку, если вы хотите изучить, как работает этот метод.

Мы просто добавляем doFinally, который срабатывает при любом сигнале завершения (отмена, ошибка или завершение). Мы используем CountDownLatch для блокировки основного потока, который будет асинхронно отсчитываться изнутри doFinally.

CountDownLatch latch = new CountDownLatch(1);

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doFinally(signal -> latch.countDown())
    .subscribe(System.out::println);

latch.await(10, TimeUnit.SECONDS);
...