Я пытаюсь делать асинхронные почтовые запросы через WebClient, так как это предпочтительный способ при использовании Springframework 5.0.
вот мой код сервера:
package com.example.demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
@Slf4j
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@PostMapping(value = "/createCustomer", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = "text/plain")
public Customer sendStream(@RequestBody String number) throws InterruptedException {
Thread.sleep(40);
return new Customer(number, "random" + number);
}
@Data
@AllArgsConstructor
class Customer {
String id;
String name;
}
}
Код клиента, который у меня есть:
package com.example.demo2;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.stream.IntStream;
@SpringBootApplication
@Slf4j
public class Demo2Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(Demo2Application.class, args);
}
@Override
public void run(String... args) throws Exception {
WebClient webClient = WebClient.create("http://localhost:8080");
IntStream.range(0, 100000).forEach(i -> {
Flux.merge(
webClient
.post()
.uri("/createCustomer")
.body(BodyInserters.fromObject(String.valueOf(i)))
.retrieve().bodyToMono(String.class)
)
.parallel()
.runOn(Schedulers.single())
.subscribe(log::info);
});
}
}
И я получаю следующие исключения, когда запускаю этот код с некоторыми успешными запусками.
2019-01-18 00:39:34.716 INFO 38236 --- [ single-1] com.example.demo2.Demo2Application : {"id":"1049","name":"random1049"} 2019-01-18 00:39:34.731 ERROR 38236
--- [ single-1] reactor.core.scheduler.Schedulers : Scheduler worker in group main failed with an uncaught exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:8080 Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:8080 at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na] at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) ~[na:na] at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:636) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897) ~[netty-common-4.1.31.Final.jar:4.1.31.Final] at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.MonoCreate] : reactor.core.publisher.Mono.create(Mono.java:183) reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:121) Error has been observed by the following operator(s): |_ Mono.create ⇢ reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:121) |_ Mono.create ⇢ reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:289) |_ Mono.retry ⇢ reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:324) |_ Mono.flatMapMany ⇢ reactor.netty.http.client.HttpClientFinalizer.responseConnection(HttpClientFinalizer.java:85) |_ Flux.next ⇢ org.springframework.http.client.reactive.ReactorClientHttpConnector.connect(ReactorClientHttpConnector.java:112) |_ Mono.doOnRequest ⇢ org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:104) |_ Mono.doOnCancel ⇢ org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:105) |_ Mono.map ⇢ org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:106) |_ Mono.switchIfEmpty ⇢ org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:319) |_ Mono.flatMap ⇢ org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.bodyToMono(DefaultWebClient.java:429) |_ Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35) |_ Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35) |_ Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
|_ Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
|_ Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
|_ Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
|_ Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
|_ Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
|_ ParallelFlux.runOn ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:36)
Caused by: java.net.ConnectException: Connection refused: no further information ... 10 common frames omitted
Что я делаю неправильно, я пытался ограничить количество потоков, но это не помогло, как видно из кода клиента, где я использую один поток.
Любая помощь будет высоко ценится.
это ошибка?