Может ли Flux of Project Reactor обрабатывать сообщения одно за другим - PullRequest
0 голосов
/ 10 июля 2020

Я пытаюсь обработать список чисел, например, от 1 до 10, одно за другим, используя Reactor Flux, и есть API / double , который просто удваивает входящие Integer (1 -> 2, 4 -> 8 ...), однако у этого API есть проблемы с производительностью, ответ всегда занимает 2 секунды. При использовании limitRate(1) я ожидал, что Reactor обрабатывает запросы один за другим следующим образом:

2020-01-01 00:00:02 - 2
2020-01-01 00:00:04 - 4
2020-01-01 00:00:06 - 6
2020-01-01 00:00:08 - 8
2020-01-01 00:00:10 - 10
...

Но на самом деле Reactor запускает все запросы сразу:

2020-01-01 00:00:02 - 6
2020-01-01 00:00:02 - 10
2020-01-01 00:00:02 - 2
2020-01-01 00:00:02 - 4
2020-01-01 00:00:02 - 8
...

Вот код

Flux.range(1, 10).limitRate(1)
                .flatMap(i -> webClient.get().uri("http://localhost:10001/double?integer={int}", i).exchange()
                        .flatMap(resp -> resp.bodyToMono(Integer.class)))
                .subscribe(System.out::println);
Thread.sleep(10000);

Кажется, limitRate работает не так, как я ожидал, что пошло не так? Есть ли способ обрабатывать запросы один за другим с помощью Reactor? Заранее спасибо.

Ответы [ 3 ]

2 голосов
/ 11 июля 2020

.flatMap здесь не работает, поскольку он охотно подписывается на внутренние потоки, то есть он не будет ждать, пока внутренний поток выдаст onComplete, прежде чем подписаться на следующий поток. Вот почему все ваши звонки совершаются одновременно. Он работает в режиме receive->dispatch->receive->dispatch.

Reactor предоставляет перегруженную версию flatMap, в которой вы можете указать коэффициент параллелизма как .flatMap(innerstream, concurrency). Этот фактор ограничивает количество потоков, на которые будет подписана flatMap. Если указано 5, flatMap может подписаться не более чем на 5 внутренних потоков. Как только этот предел достигнут, у него есть , чтобы дождаться, пока внутренний поток выдаст onComplete, прежде чем он подпишется на следующий.

В вашем случае вы можете либо установить его к 1 или используйте .concatMap(). concatMap() - это в точности flatMap с concurrency = 1. В основном он работает в режиме receive->dispatch->wait->receive->dispatch->wait.

Некоторое время назад я написал сообщение, в котором подробно объяснял, как работает flatMap, потому что я думаю, что многие люди используют его, не понимая его внутреннего устройства. Ссылаться на статью можно здесь

2 голосов
/ 10 июля 2020

Рассмотрите возможность использования concatMap вместо:

/**
 * Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
 * then flatten these inner publishers into a single {@link Flux}, sequentially and
 * preserving order using concatenation.
 * <p>
 * There are three dimensions to this operator that can be compared with
 * {@link #flatMap(Function) flatMap} and {@link #flatMapSequential(Function) flatMapSequential}:
 * <ul>
 *     <li><b>Generation of inners and subscription</b>: this operator waits for one
 *     inner to complete before generating the next one and subscribing to it.</li>
 *     <li><b>Ordering of the flattened values</b>: this operator naturally preserves
 *     the same order as the source elements, concatenating the inners from each source
 *     element sequentially.</li>
 *     <li><b>Interleaving</b>: this operator does not let values from different inners
 *     interleave (concatenation).</li>
 * </ul>
 *
 * <p>
 * Errors will immediately short circuit current concat backlog.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/concatMap.svg" alt="">
 *
 * @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
 *
 * @param mapper the function to transform this sequence of T into concatenated sequences of V
 * @param <V> the produced concatenated type
 *
 * @return a concatenated {@link Flux}
 */
public final <V> Flux<V> concatMap(Function<? super T, ? extends Publisher<? extends V>>
        mapper) {

Обратите внимание на фразу sequentially and preserving order using concatenation.. Мне кажется, что вы ищете.

0 голосов
/ 11 июля 2020

Вдохновленный ответом Артема Билана, я обнаружил, что flatMapSequential лучше для моего случая, поскольку flatMapSequential принимает второй параметр как maxConcurrency, так что можно обрабатывать сообщения не одно за другим, а два раза за раз. и et c. Спасибо Артему Билану и Прашанту Панди за ответы, очень помогли.

...