Spring Webflux Reactor WebClient onErrorDropped - PullRequest
0 голосов
/ 23 апреля 2020

Примечание: активная зона реактора: 3.3.3. Выпуск Связанная проблема: https://github.com/reactor/reactor-core/issues/1783, но я недостаточно умен, чтобы понять это.

Сценарий. Моя программа делает http-запросы к нескольким конечным точкам, используя отдельные экземпляры веб-клиента. Если обе конечные точки не работают, оба соединения отменяются, но на подписчике фиксируется только одно соединение. Ниже приведен мой тестовый код

package org.example.webclient;

import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;

import java.net.ConnectException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@Slf4j
public class WebClientTest {

    private WebClient webClient() {
        TcpClient tcpClient = TcpClient.create()
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                .doOnConnected(connection ->
                        connection.addHandlerLast(new ReadTimeoutHandler(10000))
                                .addHandlerLast(new WriteTimeoutHandler(10000))
                                .addHandlerLast(new LoggingHandler(LogLevel.TRACE)));
        return WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)
                .defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_XML_VALUE)
                .build();
    }

    private Flux<String> getEndpointsFlux() {
        List<String> endpoints = new ArrayList<>(Arrays.asList("http://localhost:8091",
                "http://localhost:8092"));
        return Flux.fromIterable(endpoints);
    }

    private Flux<String> sendRequest(String endpoint) {
        return this.webClient().post()
                .uri(URI.create(endpoint))
                .body(Mono.just("test"), String.class)
                .exchange()
                .flatMap(clientResponse -> {
                    HttpStatus statusCode = clientResponse.statusCode();
                    if (statusCode.is4xxClientError() || statusCode.is5xxServerError()) {
                        String response = null;
                        log.error("Client/Server Error while connecting to endpoint {} code {} ",
                                endpoint, statusCode);
                        return Mono.error(new SendFailedException(endpoint, statusCode.toString()));
                    }
                    return clientResponse.bodyToMono(String.class);
                })
                .onErrorResume(ChannelException.class, e -> {
                    log.error("Timeout Exception while connecting to endpoint {} {}",
                            endpoint, e.getMessage());
                    return Mono.error(new SendFailedException(endpoint, "timeout"));
                })
                .onErrorResume(ConnectException.class, e -> {
                    log.error("Connection Exception while connecting to endpoint {} {}",
                            endpoint, e.getMessage());
                    return Mono.error(new SendFailedException(endpoint, "connect_failed"));
                })
                .doOnCancel(()-> {
                    log.error("Cancelled client {}", endpoint);
                    Mono.error(new SendFailedException(endpoint, "client_cancelled"));
                })
                .flux();
    }

    private void onCompleted(String response) {
        log.info("On Completed {}", response);
    }

    private void onFailed(Throwable e) {
        log.info("On Failed {} {}", ((SendFailedException)e).endpoint, ((SendFailedException)e).getMessage());
    }

    public static void main(String... args){
        WebClientTest webClientTest = new WebClientTest();
        webClientTest.getEndpointsFlux()
                .flatMap(webClientTest::sendRequest)
                .subscribe(webClientTest::onCompleted, webClientTest::onFailed);
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    class SendFailedException extends Exception {
        String endpoint;
        String message;
        SendFailedException(String endpoint, String message) {
            super(message);
            this.endpoint = endpoint;
            this.message = message;
        }
    }
}

. Как вы можете прочитать из кода, если обе конечные точки не работают, я ожидаю дважды вызвать onFailed . Но это называется только один раз. Прикрепленный ответ ниже.

11:29:59.103 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:29:59.172 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
11:29:59.218 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
11:29:59.225 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
11:29:59.226 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
11:29:59.229 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
11:29:59.231 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
11:29:59.232 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
11:29:59.233 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
11:29:59.235 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
11:29:59.235 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
11:29:59.235 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
11:29:59.236 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
11:29:59.236 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\wjose\AppData\Local\Temp (java.io.tmpdir)
11:29:59.237 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
11:29:59.239 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3790077952 bytes
11:29:59.240 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
11:29:59.242 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
11:29:59.242 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
11:29:59.246 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
11:29:59.246 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
11:29:59.946 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
11:29:59.948 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
11:30:00.025 [main] DEBUG reactor.netty.tcp.TcpResources - [tcp] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-tcp, daemon=true, selectCount=4, workerCount=4}
11:30:00.025 [main] DEBUG reactor.netty.tcp.TcpResources - [tcp] resources will use the default ConnectionProvider: reactor.netty.resources.PooledConnectionProvider@1d8d30f7
11:30:01.379 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
11:30:01.379 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
11:30:01.410 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-http, daemon=true, selectCount=4, workerCount=4}
11:30:01.410 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default ConnectionProvider: reactor.netty.resources.PooledConnectionProvider@173ed316
11:30:01.416 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : false
11:30:01.418 [main] DEBUG reactor.netty.resources.DefaultLoopKQueue - Default KQueue support : false
11:30:01.425 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 8
11:30:01.453 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
11:30:01.453 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
11:30:01.463 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
11:30:01.463 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
11:30:01.474 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
11:30:02.234 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [4738a206] HTTP POST http://localhost:8091
11:30:02.242 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating a new client pool [PoolFactory {maxConnections=500, pendingAcquireMaxCount=-1, pendingAcquireTimeout=45000, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false}] for [localhost:8091]
11:30:02.307 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 20752 (auto-detected)
11:30:02.873 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 7c:2a:31:ff:fe:e3:91:99 (auto-detected)
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 8
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 8
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
11:30:02.944 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
11:30:02.944 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
11:30:02.944 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
11:30:02.949 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1187c9e8] HTTP POST http://localhost:8092
11:30:02.949 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating a new client pool [PoolFactory {maxConnections=500, pendingAcquireMaxCount=-1, pendingAcquireTimeout=45000, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false}] for [localhost:8092]
11:30:02.960 [reactor-http-nio-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x947cb6eb] Created a new pooled channel, now 1 active connections and 0 inactive connections
11:30:02.960 [reactor-http-nio-1] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xc4ac08e3] Created a new pooled channel, now 1 active connections and 0 inactive connections
11:30:03.019 [reactor-http-nio-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
11:30:03.020 [reactor-http-nio-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
11:30:03.021 [reactor-http-nio-2] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@470a0273
11:30:03.033 [reactor-http-nio-2] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x947cb6eb] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
11:30:03.033 [reactor-http-nio-1] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xc4ac08e3] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
11:30:04.061 [reactor-http-nio-1] ERROR org.example.webclient.WebClientTest - Connection Exception while connecting to endpoint http://localhost:8091 Connection refused: no further information: localhost/127.0.0.1:8091
11:30:04.061 [reactor-http-nio-2] ERROR org.example.webclient.WebClientTest - Connection Exception while connecting to endpoint http://localhost:8092 Connection refused: no further information: localhost/127.0.0.1:8092
11:30:04.063 [reactor-http-nio-2] ERROR org.example.webclient.WebClientTest - Cancelled client http://localhost:8091
11:30:04.063 [reactor-http-nio-2] ERROR org.example.webclient.WebClientTest - Cancelled client http://localhost:8092
11:30:04.063 [reactor-http-nio-2] INFO org.example.webclient.WebClientTest - On Failed connect_failed
11:30:04.066 [reactor-http-nio-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
org.example.webclient.WebClientTest$SendFailedException: connect_failed
    at org.example.webclient.WebClientTest.lambda$sendRequest$3(WebClientTest.java:72)
    at reactor.core.publisher.Mono.lambda$onErrorResume$31(Mono.java:3351)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:88)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
    at reactor.core.publisher.Operators.error(Operators.java:185)
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1944)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:390)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:197)
    at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:101)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
    at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:346)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
    at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:468)
    at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381)
    at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop$7(SimplePool.java:206)
    at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152)
    at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
    at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:282)
    at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:232)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
11:30:04.067 [reactor-http-nio-1] WARN io.netty.util.concurrent.DefaultPromise - An exception was thrown by reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete()
reactor.core.Exceptions$BubblingException: org.example.webclient.WebClientTest$SendFailedException: connect_failed
    at reactor.core.Exceptions.bubble(Exceptions.java:170)
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:590)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:843)
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:979)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
    at reactor.core.publisher.Operators.error(Operators.java:185)
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
    at reactor.core.publisher.Operators.error(Operators.java:185)
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1944)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:390)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
    at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:197)
    at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:101)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
    at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:346)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
    at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:468)
    at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381)
    at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop$7(SimplePool.java:206)
    at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152)
    at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
    at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:282)
    at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:232)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.example.webclient.WebClientTest$SendFailedException: connect_failed
    at org.example.webclient.WebClientTest.lambda$sendRequest$3(WebClientTest.java:72)
    at reactor.core.publisher.Mono.lambda$onErrorResume$31(Mono.java:3351)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:88)
    ... 42 common frames omitted

Это проблема с netty, делающим параллельные запросы или проблема с потоком? Фактически, я могу переделать мой код, чтобы использовать моно, а не Flux. Но в конечном итоге это приводит к одновременному выполнению нескольких веб-клиентов.

...