Примечание: активная зона реактора: 3.3.3. Выпуск Связанная проблема: https://github.com/reactor/reactor-core/issues/1783, но я недостаточно умен, чтобы понять это.
Сценарий. Моя программа делает http-запросы к нескольким конечным точкам, используя отдельные экземпляры веб-клиента. Если обе конечные точки не работают, оба соединения отменяются, но на подписчике фиксируется только одно соединение. Ниже приведен мой тестовый код
package org.example.webclient;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
import java.net.ConnectException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Slf4j
public class WebClientTest {
private WebClient webClient() {
TcpClient tcpClient = TcpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.doOnConnected(connection ->
connection.addHandlerLast(new ReadTimeoutHandler(10000))
.addHandlerLast(new WriteTimeoutHandler(10000))
.addHandlerLast(new LoggingHandler(LogLevel.TRACE)));
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)
.defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_XML_VALUE)
.build();
}
private Flux<String> getEndpointsFlux() {
List<String> endpoints = new ArrayList<>(Arrays.asList("http://localhost:8091",
"http://localhost:8092"));
return Flux.fromIterable(endpoints);
}
private Flux<String> sendRequest(String endpoint) {
return this.webClient().post()
.uri(URI.create(endpoint))
.body(Mono.just("test"), String.class)
.exchange()
.flatMap(clientResponse -> {
HttpStatus statusCode = clientResponse.statusCode();
if (statusCode.is4xxClientError() || statusCode.is5xxServerError()) {
String response = null;
log.error("Client/Server Error while connecting to endpoint {} code {} ",
endpoint, statusCode);
return Mono.error(new SendFailedException(endpoint, statusCode.toString()));
}
return clientResponse.bodyToMono(String.class);
})
.onErrorResume(ChannelException.class, e -> {
log.error("Timeout Exception while connecting to endpoint {} {}",
endpoint, e.getMessage());
return Mono.error(new SendFailedException(endpoint, "timeout"));
})
.onErrorResume(ConnectException.class, e -> {
log.error("Connection Exception while connecting to endpoint {} {}",
endpoint, e.getMessage());
return Mono.error(new SendFailedException(endpoint, "connect_failed"));
})
.doOnCancel(()-> {
log.error("Cancelled client {}", endpoint);
Mono.error(new SendFailedException(endpoint, "client_cancelled"));
})
.flux();
}
private void onCompleted(String response) {
log.info("On Completed {}", response);
}
private void onFailed(Throwable e) {
log.info("On Failed {} {}", ((SendFailedException)e).endpoint, ((SendFailedException)e).getMessage());
}
public static void main(String... args){
WebClientTest webClientTest = new WebClientTest();
webClientTest.getEndpointsFlux()
.flatMap(webClientTest::sendRequest)
.subscribe(webClientTest::onCompleted, webClientTest::onFailed);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
class SendFailedException extends Exception {
String endpoint;
String message;
SendFailedException(String endpoint, String message) {
super(message);
this.endpoint = endpoint;
this.message = message;
}
}
}
. Как вы можете прочитать из кода, если обе конечные точки не работают, я ожидаю дважды вызвать onFailed . Но это называется только один раз. Прикрепленный ответ ниже.
11:29:59.103 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:29:59.172 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
11:29:59.218 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
11:29:59.225 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
11:29:59.226 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
11:29:59.229 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
11:29:59.231 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
11:29:59.232 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
11:29:59.233 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
11:29:59.235 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
11:29:59.235 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
11:29:59.235 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
11:29:59.236 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
11:29:59.236 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\wjose\AppData\Local\Temp (java.io.tmpdir)
11:29:59.237 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
11:29:59.239 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3790077952 bytes
11:29:59.240 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
11:29:59.242 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
11:29:59.242 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
11:29:59.246 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
11:29:59.246 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
11:29:59.946 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
11:29:59.948 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
11:30:00.025 [main] DEBUG reactor.netty.tcp.TcpResources - [tcp] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-tcp, daemon=true, selectCount=4, workerCount=4}
11:30:00.025 [main] DEBUG reactor.netty.tcp.TcpResources - [tcp] resources will use the default ConnectionProvider: reactor.netty.resources.PooledConnectionProvider@1d8d30f7
11:30:01.379 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
11:30:01.379 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
11:30:01.410 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-http, daemon=true, selectCount=4, workerCount=4}
11:30:01.410 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default ConnectionProvider: reactor.netty.resources.PooledConnectionProvider@173ed316
11:30:01.416 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : false
11:30:01.418 [main] DEBUG reactor.netty.resources.DefaultLoopKQueue - Default KQueue support : false
11:30:01.425 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 8
11:30:01.453 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
11:30:01.453 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
11:30:01.463 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
11:30:01.463 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
11:30:01.474 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
11:30:02.234 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [4738a206] HTTP POST http://localhost:8091
11:30:02.242 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating a new client pool [PoolFactory {maxConnections=500, pendingAcquireMaxCount=-1, pendingAcquireTimeout=45000, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false}] for [localhost:8091]
11:30:02.307 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 20752 (auto-detected)
11:30:02.873 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 7c:2a:31:ff:fe:e3:91:99 (auto-detected)
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 8
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 8
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
11:30:02.928 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
11:30:02.929 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
11:30:02.944 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
11:30:02.944 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
11:30:02.944 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
11:30:02.949 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [1187c9e8] HTTP POST http://localhost:8092
11:30:02.949 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating a new client pool [PoolFactory {maxConnections=500, pendingAcquireMaxCount=-1, pendingAcquireTimeout=45000, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false}] for [localhost:8092]
11:30:02.960 [reactor-http-nio-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x947cb6eb] Created a new pooled channel, now 1 active connections and 0 inactive connections
11:30:02.960 [reactor-http-nio-1] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0xc4ac08e3] Created a new pooled channel, now 1 active connections and 0 inactive connections
11:30:03.019 [reactor-http-nio-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
11:30:03.020 [reactor-http-nio-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
11:30:03.021 [reactor-http-nio-2] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@470a0273
11:30:03.033 [reactor-http-nio-2] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x947cb6eb] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
11:30:03.033 [reactor-http-nio-1] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0xc4ac08e3] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
11:30:04.061 [reactor-http-nio-1] ERROR org.example.webclient.WebClientTest - Connection Exception while connecting to endpoint http://localhost:8091 Connection refused: no further information: localhost/127.0.0.1:8091
11:30:04.061 [reactor-http-nio-2] ERROR org.example.webclient.WebClientTest - Connection Exception while connecting to endpoint http://localhost:8092 Connection refused: no further information: localhost/127.0.0.1:8092
11:30:04.063 [reactor-http-nio-2] ERROR org.example.webclient.WebClientTest - Cancelled client http://localhost:8091
11:30:04.063 [reactor-http-nio-2] ERROR org.example.webclient.WebClientTest - Cancelled client http://localhost:8092
11:30:04.063 [reactor-http-nio-2] INFO org.example.webclient.WebClientTest - On Failed connect_failed
11:30:04.066 [reactor-http-nio-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
org.example.webclient.WebClientTest$SendFailedException: connect_failed
at org.example.webclient.WebClientTest.lambda$sendRequest$3(WebClientTest.java:72)
at reactor.core.publisher.Mono.lambda$onErrorResume$31(Mono.java:3351)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:88)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
at reactor.core.publisher.Operators.error(Operators.java:185)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1944)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:390)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:197)
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:101)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:346)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:468)
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381)
at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop$7(SimplePool.java:206)
at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152)
at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:282)
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:232)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
11:30:04.067 [reactor-http-nio-1] WARN io.netty.util.concurrent.DefaultPromise - An exception was thrown by reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete()
reactor.core.Exceptions$BubblingException: org.example.webclient.WebClientTest$SendFailedException: connect_failed
at reactor.core.Exceptions.bubble(Exceptions.java:170)
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:590)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:843)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:979)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
at reactor.core.publisher.Operators.error(Operators.java:185)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
at reactor.core.publisher.Operators.error(Operators.java:185)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1944)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:390)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214)
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:197)
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:101)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:346)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:468)
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381)
at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop$7(SimplePool.java:206)
at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152)
at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183)
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:282)
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:232)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.example.webclient.WebClientTest$SendFailedException: connect_failed
at org.example.webclient.WebClientTest.lambda$sendRequest$3(WebClientTest.java:72)
at reactor.core.publisher.Mono.lambda$onErrorResume$31(Mono.java:3351)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:88)
... 42 common frames omitted
Это проблема с netty, делающим параллельные запросы или проблема с потоком? Фактически, я могу переделать мой код, чтобы использовать моно, а не Flux. Но в конечном итоге это приводит к одновременному выполнению нескольких веб-клиентов.