Я открыл проблему в github с ядром Reactor, но, возможно, это не их ответственность, поэтому никто мне не помог.
https://github.com/reactor/reactor-core/issues/1551
Введение общего потока:
- При получении запроса от контроллера создается Flux и публикуется серия сообщений.
- Несколько подписчиков подписываются на этот поток.
- onNext, каждый подписчик отправляет запрос API на определенный сервер.
- Добавлены запросы (фактически к ответу), промежуточная операция повторной попытки.
Scheduler requestsScheduler = Schedulers.single();
Scheduler retryingScheduler = Schedulers.newSingle("retrying_thread");
protected void hookOnNext(Message message) {
getWebClient()
.buildRequest(message)
.publishOn(requestsScheduler)
.retryWhen(c -> retryByPolicy(c))
.publishOn(requestsScheduler )
.subscribe(new ResponseSubscriber(this));
}
private Flux<Long> retryByPolicy(Flux<Throwable> companionErrors) {
int firstRetryIndex = 1;
return companionErrors.publishOn(retryingScheduler)
.zipWith(Flux.range(firstRetryIndex,this.maxRetries),(err, retryIndex) -> {
if (retryIndex >= maxRetries )
throw Exceptions.propagate(new ReachedMaxRetriesException());
else
return retryIndex;
})
.flatMap(retryIndex -> Mono.delay(this.retryDelay(retryIndex), retryingScheduler))
.publishOn(requestsScheduler );
}
- На этот запрос «ResponseSubscriber» обрабатывает входящий ответ и вызывает «MessageSubscriber» .subscription.request (1).
Ожидаемое поведение
Вся обработка повторных попыток асинхронна и параллельна другой обработке MessageSubscriber
Фактическое поведение
При повторной попытке все остальные MessageSubscriber ожидают полной обработки ответа конкретного MessageSubscriber, который находится в состоянии «повторной попытки».
Пример 1 - только когда MessageSubscriberB не удалось отправить запрос, MessageSubscriberA начинает отправку
03-03 14:25:56 [reactor-http-nio-3] INFO ControlelrRequest "1551615956502" changed its status from 'PENDING' to 'IN_PROGRESS'
03-03 14:25:56 [reactor-http-nio-3] INFO messageSubscriberA Request: POST http://localhost:8080/REST/Command/1551615956412 of Message #1
03-03 14:25:56 [reactor-http-nio-3] INFO messageSubscriberB Request: POST http://172.29.50.248:8080/REST/Command/1551615956412 of Message #1
03-03 14:25:56 [single-1] INFO responseSubscriberA Handling Web response of Message #1
03-03 14:25:57 [retrying_thread-2] WARN In retrying policy of messageSubscriberB : trying to send the Message #1 to 'SerberB' had failed - trying #1
03-03 14:26:18 [retrying_thread-2] WARN In retrying policy of messageSubscriberB : trying to send the Message #1 to 'SerberB' had failed - trying #2
03-03 14:26:39 [retrying_thread-2] WARN In retrying policy of messageSubscriberB : trying to send the Message #1 to 'SerberB' had failed - trying #3
03-03 14:26:39 [single-1] ERROR Failed to send requests (Message #1) to the server 'ServerB'. got failed after reached max retries code
03-03 14:26:39 [single-1] INFO messageSubscriberA Request: POST http://localhost:8080/REST/Command/1551615956412 of Message #2
03-03 14:26:40 [single-1] INFO responseSubscriberA Handling Web response of Message #228
03-03 14:26:40 [single-1] INFO messageSubscriberA Request: POST http://localhost:8080/REST/Command/1551615956412 of Message #3
03-03 14:26:40 [single-1] INFO responseSubscriberA Handling Web response of Message #3
03-03 14:26:40 [single-1] INFO messageSubscriberA Request: POST http://localhost:8080/REST/Command/1551615956412 of Message #4
03-03 14:26:41 [single-1] INFO responseSubscriberA Handling Web response of Message #4
03-03 14:26:41 [single-1] WARN ControlelrRequest "1551615956502" changed its status from 'IN_PROGRESS' to 'DONE' with failures.
Reactor Core версия
Реактор-core.3.2.3.RELEASE * * тысяча тридцать два
версия JVM (например, Java-версия)
Java-версия "11.0.1" 2018-10-16 LTS
Java (TM) SE Runtime Environment 18.9 (сборка 11.0.1 + 13-LTS)
Java HotSpot (TM) 64-битный сервер ВМ 18.9 (сборка 11.0.1 + 13-LTS, смешанный режим)