У меня есть простой фрагмент кода, который «обрабатывает» данные в фоновом режиме и после каждого элемента nth
регистрирует общее время, потраченное на последние n
элементы:
class BackgroundWorker implements AutoCloseable {
private final ExecutorService thread = Executors.newSingleThreadExecutor();
private final int reportEvery;
private int processed;
private LocalTime begin;
BackgroundWorker(int reportEvery) {
this.reportEvery = reportEvery;
}
CompletableFuture<Boolean> process(int item) {
var future = new CompletableFuture<Boolean>();
thread.submit(() -> {
try {
if (processed == 0) {
begin = LocalTime.now();
}
if (++processed == reportEvery) {
System.out.format("Processed %d items in %dms%n",
processed, ChronoUnit.MILLIS.between(begin, LocalTime.now()));
processed = 0;
}
future.complete(true);
} catch (Exception ex) {
future.complete(false);
}
});
return future;
}
@Override
public void close() {
thread.shutdownNow();
}
}
Затем я иметь Flux
, который подает данные в BackgroundWorker
, считая CompletableFuture
с выполненными успешно:
Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100000).boxed());
try (var worker = new BackgroundWorker(10000)) {
int successCount = numbers
.map(worker::process)
.map(future -> future.thenApply(success -> success ? 1 : 0))
.reduce(
CompletableFuture.completedFuture(0),
(acc, curr) -> acc.thenCombine(curr, Integer::sum))
.block()
.join();
System.out.println("Done; success: " + successCount);
}
И тот же кусок кода, но теперь вместо него используется Mono.fromCompletionStage
:
int successCount = numbers
.map(worker::process)
.map(Mono::fromCompletionStage)
.map(mono -> mono.map(success -> success ? 1 : 0))
.reduce(
Mono.just(0),
(acc, curr) -> acc.zipWith(curr, Integer::sum))
.block()
.block();
Первый, который использует фьючерсы, напечатает что-то вроде:
Processed 10000 items in 48ms
Processed 10000 items in 17ms
Processed 10000 items in 10ms
Processed 10000 items in 8ms
Processed 10000 items in 9ms
Processed 10000 items in 5ms
Processed 10000 items in 5ms
Processed 10000 items in 4ms
Processed 10000 items in 3ms
Processed 10000 items in 4ms
Done; success: 100000
Но версия, использующая Mono.fromCompletionStage
, печатает:
Processed 10000 items in 138ms
Processed 10000 items in 253ms
Processed 10000 items in 327ms
Processed 10000 items in 477ms
Processed 10000 items in 315ms
Processed 10000 items in 379ms
Processed 10000 items in 448ms
Processed 10000 items in 509ms
Processed 10000 items in 595ms
Processed 10000 items in 668ms
Done; success: 100000
Почему использование Mono
вместо CompletableFuture
сильно снижает производительность?