Почему Reactor's Mono.fromCompletionStage медленнее простого CompletableFuture? - PullRequest
1 голос
/ 22 апреля 2020

У меня есть простой фрагмент кода, который «обрабатывает» данные в фоновом режиме и после каждого элемента nth регистрирует общее время, потраченное на последние n элементы:

class BackgroundWorker implements AutoCloseable {
  private final ExecutorService thread = Executors.newSingleThreadExecutor();
  private final int reportEvery;
  private int processed;
  private LocalTime begin;

  BackgroundWorker(int reportEvery) {
    this.reportEvery = reportEvery;
  }

  CompletableFuture<Boolean> process(int item) {
    var future = new CompletableFuture<Boolean>();
    thread.submit(() ->  {
      try {
        if (processed == 0) {
          begin = LocalTime.now();
        }
        if (++processed == reportEvery) {
          System.out.format("Processed %d items in %dms%n",
              processed, ChronoUnit.MILLIS.between(begin, LocalTime.now()));
          processed = 0;
        }
        future.complete(true);
      } catch (Exception ex) {
        future.complete(false);
      }
    });
    return future;
  }

  @Override
  public void close() {
    thread.shutdownNow();
  }
}

Затем я иметь Flux, который подает данные в BackgroundWorker, считая CompletableFuture с выполненными успешно:

Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100000).boxed());
try (var worker = new BackgroundWorker(10000)) {
  int successCount = numbers
      .map(worker::process)
      .map(future -> future.thenApply(success -> success ? 1 : 0))
      .reduce(
          CompletableFuture.completedFuture(0),
          (acc, curr) -> acc.thenCombine(curr, Integer::sum))
      .block()
      .join();

  System.out.println("Done; success: " + successCount);
}

И тот же кусок кода, но теперь вместо него используется Mono.fromCompletionStage:

int successCount = numbers
    .map(worker::process)
    .map(Mono::fromCompletionStage)
    .map(mono -> mono.map(success -> success ? 1 : 0))
    .reduce(
        Mono.just(0),
        (acc, curr) -> acc.zipWith(curr, Integer::sum))
    .block()
    .block();

Первый, который использует фьючерсы, напечатает что-то вроде:

Processed 10000 items in 48ms
Processed 10000 items in 17ms
Processed 10000 items in 10ms
Processed 10000 items in 8ms
Processed 10000 items in 9ms
Processed 10000 items in 5ms
Processed 10000 items in 5ms
Processed 10000 items in 4ms
Processed 10000 items in 3ms
Processed 10000 items in 4ms
Done; success: 100000

Но версия, использующая Mono.fromCompletionStage, печатает:

Processed 10000 items in 138ms
Processed 10000 items in 253ms
Processed 10000 items in 327ms
Processed 10000 items in 477ms
Processed 10000 items in 315ms
Processed 10000 items in 379ms
Processed 10000 items in 448ms
Processed 10000 items in 509ms
Processed 10000 items in 595ms
Processed 10000 items in 668ms
Done; success: 100000

Почему использование Mono вместо CompletableFuture сильно снижает производительность?

1 Ответ

1 голос
/ 23 апреля 2020

Кажется, что Mono s - это то, что занимает больше всего времени и как-то влияет на выполнение. Вероятно, потому, что подобное сжатие каждый раз создает новый экземпляр MonoZip.

Но на этом этапе вам не нужно использовать уменьшение и сжатие. Это больше идиоматических c до flatMap моно, получите Flux<Integer>, которое вы уменьшите без создания промежуточного мусора.

Кроме того, поскольку фьючерсы в основном начинают обработку при создании, вы можете сделать еще проще concatMap (меньше накладных расходов, и необходимость ждать завершения каждого моно на данном этапе на самом деле не имеет значения, поскольку все фьючерсы уже работают в фоновом режиме):

Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100_000).boxed());
try (BackgroundWorker worker = new BackgroundWorker(10000)) {
    int successCount = numbers
            .map(worker::process)
            .concatMap(future -> Mono.fromCompletionStage(future))
            .map(success -> success ? 1 : 0)
            .reduce(0, Integer::sum)
            .block();

    System.out.println("Done; success: " + successCount);
}

Вы можете даже немного уменьшите накладные расходы, избегая преобразования из логического значения в int и делая это при уменьшении:

.reduce(0, (acc, bool) -> bool ? acc + 1 : acc)
...