У меня есть потоки входящих событий, которые необходимо обогащать, а затем обрабатывать параллельно по мере их поступления.
Я думал, что Project Reactor был сделан на заказ для работы, но в моих тестах всекажется, что обработка выполняется последовательно.
Вот некоторый тестовый код:
ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
.map(i-> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.take(3)
// .subscribeOn(Schedulers.elastic());
// .subscribeOn(Schedulers.newParallel("test"));
// .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()),
System.out::println,
()-> System.out.println("Done"));
System.out.println("DONE AND DONE");
Я пытался раскомментировать каждую из закомментированных строк, однако в каждом случае вывод указывает, что один и тот же поток являетсяиспользуется для обработки всех событий
Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done
(Единственное отличие состоит в том, что без планировщиков они запускаются в потоке подписки, тогда как с любым из исполнителей все они выполняются в одном потоке, которыйэто не тема подписки.)
Чего мне не хватает?
К вашему сведению, существует метод "сна":
public static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
System.out.println("Exiting");
}
}