Как обрабатывать события Flux параллельно друг другу? - PullRequest
1 голос
/ 28 апреля 2019

У меня есть потоки входящих событий, которые необходимо обогащать, а затем обрабатывать параллельно по мере их поступления.

Я думал, что Project Reactor был сделан на заказ для работы, но в моих тестах всекажется, что обработка выполняется последовательно.

Вот некоторый тестовый код:

ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
        .map(i-> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .take(3)
//    .subscribeOn(Schedulers.elastic());
//    .subscribeOn(Schedulers.newParallel("test"));
//    .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()), 
               System.out::println, 
               ()-> System.out.println("Done"));
System.out.println("DONE AND DONE");

Я пытался раскомментировать каждую из закомментированных строк, однако в каждом случае вывод указывает, что один и тот же поток являетсяиспользуется для обработки всех событий

Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done

(Единственное отличие состоит в том, что без планировщиков они запускаются в потоке подписки, тогда как с любым из исполнителей все они выполняются в одном потоке, которыйэто не тема подписки.)

Чего мне не хватает?

К вашему сведению, существует метод "сна":

public static void sleep(long time) {
    try {
        Thread.sleep(time);
    } catch (InterruptedException e) {
        System.out.println("Exiting");
    }
}

1 Ответ

1 голос
/ 28 апреля 2019

Один из способов параллельной обработки элементов - это использование .parallel / .runOn

flux
    .parallel(10)
    .runOn(scheduler)
    //
    // Work to be performed in parallel goes here.  (e.g. .map, .flatMap, etc)
    //
    // Then, if/when you're ready to go back to sequential, call .sequential()
    .sequential()

Операции блокировки (например, блокировка ввода-вывода или Thread.sleep) блокируют поток, в котором онивыполнены.Реактивные потоки не могут волшебным образом превратить блокирующий метод в неблокирующий метод.Поэтому вам необходимо убедиться, что методы блокировки выполняются на Scheduler, подходящем для операций блокировки (например, Schedulers.elastic()).

В приведенном выше примере, поскольку вы знаете, что вызываете операцию блокировки, вы можете использовать.runOn(Schedulers.elastic()).

В зависимости от варианта использования вы также можете использовать асинхронные операторы, такие как .flatMap в сочетании с .subscribeOn или .publishOn, чтобы делегировать определенные операции блокировки другой Scheduler, например описанные в проекте реактора документы .Например:

flux
    .flatMap(i -> Mono.fromCallable(() -> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .subscribeOn(Schedulers.elastic()))

На самом деле, .flatMap также имеет перегруженный вариант, который принимает параметр concurrency, где вы можете ограничить максимальное количество внутренних последовательностей в полете.Это может быть использовано вместо .parallel в некоторых случаях.Однако не обычно работает для Flux.interval, поскольку Flux.interval не поддерживает нисходящие запросы, которые пополняются медленнее, чем тики.

...