Повторная попытка при отправке запроса через WebClient «блокирует» других подписчиков - PullRequest
0 голосов
/ 05 марта 2019

Я открыл проблему в github с ядром Reactor, но, возможно, это не их ответственность, поэтому никто мне не помог.

https://github.com/reactor/reactor-core/issues/1551 Введение общего потока:

  1. При получении запроса от контроллера создается Flux и публикуется серия сообщений.
  2. Несколько подписчиков подписываются на этот поток.
  3. onNext, каждый подписчик отправляет запрос API на определенный сервер.
  4. Добавлены запросы (фактически к ответу), промежуточная операция повторной попытки.

Scheduler requestsScheduler = Schedulers.single();
Scheduler retryingScheduler = Schedulers.newSingle("retrying_thread");

protected void hookOnNext(Message message) {
    getWebClient()
        .buildRequest(message)
        .publishOn(requestsScheduler)
        .retryWhen(c -> retryByPolicy(c))
        .publishOn(requestsScheduler )
        .subscribe(new ResponseSubscriber(this)); 
}

private Flux<Long> retryByPolicy(Flux<Throwable> companionErrors) {
    int firstRetryIndex = 1;
    return companionErrors.publishOn(retryingScheduler)
        .zipWith(Flux.range(firstRetryIndex,this.maxRetries),(err, retryIndex) -> {
            if (retryIndex >= maxRetries )
                throw Exceptions.propagate(new ReachedMaxRetriesException());
            else
                return retryIndex;
        })
        .flatMap(retryIndex -> Mono.delay(this.retryDelay(retryIndex), retryingScheduler))
        .publishOn(requestsScheduler );
}
  1. На этот запрос «ResponseSubscriber» обрабатывает входящий ответ и вызывает «MessageSubscriber» .subscription.request (1).

Ожидаемое поведение

Вся обработка повторных попыток асинхронна и параллельна другой обработке MessageSubscriber

Фактическое поведение

При повторной попытке все остальные MessageSubscriber ожидают полной обработки ответа конкретного MessageSubscriber, который находится в состоянии «повторной попытки».

Пример 1 - только когда MessageSubscriberB не удалось отправить запрос, MessageSubscriberA начинает отправку

03-03 14:25:56   [reactor-http-nio-3]   INFO   ControlelrRequest "1551615956502" changed its status from 'PENDING' to 'IN_PROGRESS'
03-03 14:25:56   [reactor-http-nio-3]   INFO   messageSubscriberA  Request: POST http://localhost:8080/REST/Command/1551615956412 of Message #1
03-03 14:25:56   [reactor-http-nio-3]   INFO   messageSubscriberB Request: POST http://172.29.50.248:8080/REST/Command/1551615956412 of Message #1
03-03 14:25:56   [single-1]   INFO   responseSubscriberA Handling Web response of Message #1 
03-03 14:25:57   [retrying_thread-2]   WARN   In retrying policy of messageSubscriberB : trying to send the Message #1 to 'SerberB' had failed - trying #1
03-03 14:26:18   [retrying_thread-2]    WARN   In retrying policy of messageSubscriberB : trying to send the Message #1 to 'SerberB' had failed - trying #2
03-03 14:26:39   [retrying_thread-2]     WARN   In retrying policy of messageSubscriberB : trying to send the Message #1 to 'SerberB' had failed - trying #3
03-03 14:26:39   [single-1]  ERROR   Failed to send requests (Message #1) to the server 'ServerB'. got failed after reached max retries code
03-03 14:26:39   [single-1]   INFO  messageSubscriberA  Request: POST http://localhost:8080/REST/Command/1551615956412 of Message #2
03-03 14:26:40   [single-1]   INFO   responseSubscriberA Handling Web response of Message #228 
03-03 14:26:40   [single-1]   INFO  messageSubscriberA  Request: POST http://localhost:8080/REST/Command/1551615956412 of Message #3
03-03 14:26:40   [single-1]   INFO   responseSubscriberA Handling Web response of Message #3
03-03 14:26:40   [single-1]   INFO  messageSubscriberA  Request: POST http://localhost:8080/REST/Command/1551615956412 of Message #4
03-03 14:26:41   [single-1]   INFO   responseSubscriberA Handling Web response of Message #4
03-03 14:26:41   [single-1]   WARN   ControlelrRequest "1551615956502" changed its status from 'IN_PROGRESS' to 'DONE' with failures.

Reactor Core версия Реактор-core.3.2.3.RELEASE * * тысяча тридцать два

версия JVM (например, Java-версия) Java-версия "11.0.1" 2018-10-16 LTS Java (TM) SE Runtime Environment 18.9 (сборка 11.0.1 + 13-LTS) Java HotSpot (TM) 64-битный сервер ВМ 18.9 (сборка 11.0.1 + 13-LTS, смешанный режим)

...