Отрегулируйте асинхронное моно по количеству одновременных задач (не по времени) - PullRequest
1 голос
/ 13 апреля 2019

Скажем, у меня есть метод, который принимает параметр и возвращает Mono<Integer>, который завершается асинхронно.Например:

Random random = new Random();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);

Mono<Integer> fetch(String a) {
  return Mono.create(em -> {
    scheduledExecutorService.schedule(() -> em.next(a + " result"), 
      10 + random.nextInt(50), TimeUnit.MILLISECONDS);
  });
}

Скажем, у меня есть Flux<String>, который я могу использовать в методе fetch, описанном выше, и который может содержать много элементов.

Есть ли способ, которым я могуМожно ли гарантировать, что метод вызывается параллельно, но ограничить количество одновременных вызовов заранее определенным номером?

Например, 4 в приведенном выше примере, в то время как у меня есть 16 доступных потоков - так что я всегда оставляю 12 запасных от этогоперспектива.

1 Ответ

2 голосов
/ 13 апреля 2019

Предполагая, что «feed in» означает, что вы используете flux.flatMap(this::fetch), вы можете установить параллелизм flatMap, вызвав вместо этого flux.flatMap(this::fetch, 4).

Кроме того, ваш код содержит две ошибки компиляции:

  1. тип возврата Mono<Integer> не соответствует типу предмета, который вы даете в раковину (a + " result").Я предполагаю, что вы имели в виду Mono<String>
  2. MonoSink не имеет .next метод.Я предполагаю, что вы имели в виду .success

Учитывая все это, вот пример:

    private Flux<String> fetchAll() {
        return Flux.range(0, 50)
                .map(i -> Integer.toString(i))
                .flatMap(this::fetch, 4);

    }

    private Mono<String> fetch(String a) {
        return Mono.create(em ->
                scheduledExecutorService.schedule(() -> em.success(a + " result"),
                        10 + random.nextInt(50), TimeUnit.MILLISECONDS)
        );
    }

...